You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/10/02 17:16:10 UTC

svn commit: r1003810 - in /camel/trunk/camel-core/src: main/java/org/apache/camel/ main/java/org/apache/camel/builder/ main/java/org/apache/camel/processor/ main/java/org/apache/camel/util/ test/java/org/apache/camel/builder/ test/java/org/apache/camel...

Author: davsclaus
Date: Sat Oct  2 15:16:09 2010
New Revision: 1003810

URL: http://svn.apache.org/viewvc?rev=1003810&view=rev
Log:
CAMEL-3188: Fixed concurrency issue with binary predicates.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderConcurrentTest.java
      - copied, changed from r1003532, camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CBRConcurrencyIssueTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/BinaryPredicate.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BinaryPredicateSupport.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/BinaryPredicate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/BinaryPredicate.java?rev=1003810&r1=1003809&r2=1003810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/BinaryPredicate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/BinaryPredicate.java Sat Oct  2 15:16:09 2010
@@ -52,14 +52,18 @@ public interface BinaryPredicate extends
     Expression getRight();
 
     /**
-     * Gets the evaluated left hand side value
+     * Gets the evaluated left hand side value.
+     * <p/>
+     * Beware of thread safety that the result of the {@link #getRightValue()} may in fact be from another evaluation.
      *
      * @return the left value, may be <tt>null</tt> if predicate has not been matched yet.
      */
     Object getLeftValue();
 
     /**
-     * Gets the evaluated right hand side value
+     * Gets the evaluated right hand side value.
+     * <p/>
+     * Beware of thread safety that the result of the {@link #getLeftValue()} may in fact be from another evaluation.
      *
      * @return the right value, may be <tt>null</tt> if predicate has not been matched yet.
      */

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BinaryPredicateSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BinaryPredicateSupport.java?rev=1003810&r1=1003809&r2=1003810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BinaryPredicateSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BinaryPredicateSupport.java Sat Oct  2 15:16:09 2010
@@ -32,8 +32,8 @@ public abstract class BinaryPredicateSup
 
     private final Expression left;
     private final Expression right;
-    private Object leftValue;
-    private Object rightValue;
+    private Object lastLeftValue;
+    private Object lastRightValue;
 
     protected BinaryPredicateSupport(Expression left, Expression right) {
         notNull(left, "left");
@@ -49,8 +49,12 @@ public abstract class BinaryPredicateSup
     }
 
     public boolean matches(Exchange exchange) {
-        leftValue = left.evaluate(exchange, Object.class);
-        rightValue = right.evaluate(exchange, Object.class);
+        // must be thread safe and store result in local objects
+        Object leftValue = left.evaluate(exchange, Object.class);
+        Object rightValue = right.evaluate(exchange, Object.class);
+        // remember last result (may not be thread safe)
+        lastRightValue = rightValue;
+        lastLeftValue = leftValue;
         return matches(exchange, leftValue, rightValue);
     }
 
@@ -71,10 +75,10 @@ public abstract class BinaryPredicateSup
     }
 
     public Object getRightValue() {
-        return rightValue;
+        return lastRightValue;
     }
 
     public Object getLeftValue() {
-        return leftValue;
+        return lastLeftValue;
     }
 }

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java?rev=1003810&r1=1003809&r2=1003810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java Sat Oct  2 15:16:09 2010
@@ -29,6 +29,8 @@ import org.apache.camel.impl.ServiceSupp
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.util.AsyncProcessorHelper;
 import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 
 /**
  * Implements a Choice structure where one or more predicates are used which if
@@ -38,6 +40,7 @@ import org.apache.camel.util.ServiceHelp
  * @version $Revision$
  */
 public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable {
+    private static final transient Log LOG = LogFactory.getLog(ChoiceProcessor.class);
     private final List<FilterProcessor> filters;
     private final AsyncProcessor otherwise;
 
@@ -51,8 +54,9 @@ public class ChoiceProcessor extends Ser
     }
 
     public boolean process(Exchange exchange, AsyncCallback callback) {
-        for (FilterProcessor filterProcessor : filters) {
-            Predicate predicate = filterProcessor.getPredicate();
+        for (int i = 0; i < filters.size(); i++) {
+            FilterProcessor filter = filters.get(i);
+            Predicate predicate = filter.getPredicate();
 
             boolean matches = false;
             try {
@@ -66,10 +70,14 @@ public class ChoiceProcessor extends Ser
                 return true;
             }
 
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("#" + i + " - " + predicate + " matches: " + matches + " for: " + exchange);
+            }
+
             if (matches) {
                 // process next will also take care (has not null test) if next was a stop().
                 // stop() has no processor to execute, and thus we will end in a NPE
-                return filterProcessor.processNext(exchange, callback);
+                return filter.processNext(exchange, callback);
             }
         }
         if (otherwise != null) {

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java?rev=1003810&r1=1003809&r2=1003810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java Sat Oct  2 15:16:09 2010
@@ -19,7 +19,6 @@ package org.apache.camel.util;
 import java.io.Closeable;
 import java.io.File;
 import java.io.FileNotFoundException;
-import java.io.IOException;
 import java.io.InputStream;
 import java.lang.annotation.Annotation;
 import java.lang.reflect.AnnotatedElement;
@@ -68,18 +67,26 @@ public final class ObjectHelper {
     /**
      * A helper method for comparing objects for equality in which it uses type coerce to coerce
      * types between the left and right values. This allows you to equal test eg String and Integer as
-     * Camel will be able to coerce the types
+     * Camel will be able to coerce the types.
      */
     public static boolean typeCoerceEquals(TypeConverter converter, Object leftValue, Object rightValue) {
+        // sanity check
+        if (leftValue == null && rightValue == null) {
+            // they are equal
+            return true;
+        } else if (leftValue == null || rightValue == null) {
+            // only one of them is null so they are not equal
+            return false;
+        }
+
         // try without type coerce
         boolean answer = equal(leftValue, rightValue);
         if (answer) {
             return true;
         }
 
-        if (leftValue == null || rightValue == null) {
-            // no reason to continue as the first equal did not match and now one of the values is null
-            // so it wont help to type coerce to a null type
+        // are they same type, if so return false as the equals returned false
+        if (leftValue.getClass().isInstance(rightValue)) {
             return false;
         }
 

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderConcurrentTest.java (from r1003532, camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderConcurrentTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderConcurrentTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java&r1=1003532&r2=1003810&rev=1003810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderConcurrentTest.java Sat Oct  2 15:16:09 2010
@@ -16,120 +16,68 @@
  */
 package org.apache.camel.builder;
 
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
+import org.apache.camel.Expression;
 import org.apache.camel.Predicate;
-import org.apache.camel.TestSupport;
-import org.apache.camel.impl.DefaultCamelContext;
 import org.apache.camel.impl.DefaultExchange;
-import static org.apache.camel.builder.Builder.constant;
-import static org.apache.camel.builder.PredicateBuilder.in;
-import static org.apache.camel.builder.PredicateBuilder.not;
 
 /**
  * @version $Revision$
  */
-public class PredicateBuilderTest extends TestSupport {
-    protected Exchange exchange = new DefaultExchange(new DefaultCamelContext());
-
-    public void testRegexPredicates() throws Exception {
-        assertMatches(header("location").regex("[a-zA-Z]+,London,UK"));
-        assertDoesNotMatch(header("location").regex("[a-zA-Z]+,Westminster,[a-zA-Z]+"));
-    }
-
-    public void testPredicates() throws Exception {
-        assertMatches(header("name").isEqualTo(constant("James")));
-        assertMatches(not(header("name").isEqualTo(constant("Claus"))));
-    }
-
-    public void testFailingPredicates() throws Exception {
-        assertDoesNotMatch(header("name").isEqualTo(constant("Hiram")));
-        assertDoesNotMatch(header("size").isGreaterThan(constant(100)));
-        assertDoesNotMatch(not(header("size").isLessThan(constant(100))));
-    }
-
-    public void testCompoundOrPredicates() throws Exception {
-        Predicate p1 = header("name").isEqualTo(constant("Hiram"));
-        Predicate p2 = header("size").isGreaterThanOrEqualTo(constant(10));
-        Predicate or = PredicateBuilder.or(p1, p2);
-
-        assertMatches(or);
-    }
-
-    public void testCompoundAndPredicates() throws Exception {
-        Predicate p1 = header("name").isEqualTo(constant("James"));
-        Predicate p2 = header("size").isGreaterThanOrEqualTo(constant(10));
-        Predicate and = PredicateBuilder.and(p1, p2);
-
-        assertMatches(and);
-    }
-
-    public void testCompoundAndOrPredicates() throws Exception {
-        Predicate p1 = header("name").isEqualTo(constant("Hiram"));
-        Predicate p2 = header("size").isGreaterThan(constant(100));
-        Predicate p3 = header("location").contains("London");
-        Predicate and = PredicateBuilder.and(p1, p2);
-        Predicate andor = PredicateBuilder.or(and, p3);
-
-        assertMatches(andor);
-    }
-
-    public void testPredicateIn() throws Exception {
-        assertMatches(in(header("name").isEqualTo("Hiram"), header("name").isEqualTo("James")));
-    }
-
-    public void testValueIn() throws Exception {
-        assertMatches(header("name").in("Hiram", "Jonathan", "James", "Claus"));
-    }
-
-    public void testStartsWith() throws Exception {
-        assertMatches(header("name").startsWith("J"));
-        assertMatches(header("name").startsWith("James"));
-        assertDoesNotMatch(header("name").startsWith("C"));
-
-        assertMatches(header("size").startsWith("1"));
-        assertMatches(header("size").startsWith("10"));
-        assertDoesNotMatch(header("size").startsWith("99"));
-        assertDoesNotMatch(header("size").startsWith("9"));
-
-        assertMatches(header("size").startsWith(1));
-        assertMatches(header("size").startsWith(10));
-        assertDoesNotMatch(header("size").startsWith(99));
-        assertDoesNotMatch(header("size").startsWith(9));
-    }
-
-    public void testEndsWith() throws Exception {
-        assertMatches(header("name").endsWith("mes"));
-        assertMatches(header("name").endsWith("James"));
-        assertDoesNotMatch(header("name").endsWith("world"));
-
-        assertMatches(header("size").endsWith("0"));
-        assertMatches(header("size").endsWith("10"));
-        assertDoesNotMatch(header("size").endsWith("99"));
-        assertDoesNotMatch(header("size").endsWith("9"));
-
-        assertMatches(header("size").endsWith(0));
-        assertMatches(header("size").endsWith(10));
-        assertDoesNotMatch(header("size").endsWith(99));
-        assertDoesNotMatch(header("size").endsWith(9));
-    }
+public class PredicateBuilderConcurrentTest extends ContextTestSupport {
 
     @Override
-    protected void setUp() throws Exception {
-        super.setUp();
-        Message in = exchange.getIn();
-        in.setBody("Hello there!");
-        in.setHeader("name", "James");
-        in.setHeader("location", "Islington,London,UK");
-        in.setHeader("size", 10);
+    public boolean isUseRouteBuilder() {
+        return false;
     }
 
-    protected void assertMatches(Predicate predicate) {
-        assertPredicateMatches(predicate, exchange);
-    }
+    public void testPredicateBuilderConcurrent() throws Exception {
+        List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
+
+        ExecutorService pool = Executors.newFixedThreadPool(10);
+        for (int i = 0; i < 1000; i++) {
+            final Integer num = i;
+            Future<Boolean> future = pool.submit(new Callable<Boolean>() {
+                public Boolean call() throws Exception {
+                    Expression left = ExpressionBuilder.headerExpression("foo");
+                    Expression right;
+                    if (num % 2 == 0) {
+                        right = ExpressionBuilder.constantExpression("ABC");
+                    } else {
+                        right = ExpressionBuilder.constantExpression("DEF");
+                    }
+                    Predicate predicate = PredicateBuilder.isEqualTo(left, right);
+
+                    Exchange exchange = new DefaultExchange(context);
+                    exchange.getIn().setBody("Hello World");
+                    exchange.getIn().setHeader("foo", "ABC");
+
+                    return predicate.matches(exchange);
+                }
+            });
+
+            futures.add(future);
+        }
+
+        for (int i = 0; i < 1000; i++) {
+            Boolean result = futures.get(i).get(10, TimeUnit.SECONDS);
+            if (i % 2 == 0) {
+                assertEquals("Should be true for #" + i, true, result.booleanValue());
+            } else {
+                assertEquals("Should be false for #" + i, false, result.booleanValue());
+            }
+        }
 
-    protected void assertDoesNotMatch(Predicate predicate) {
-        assertPredicateDoesNotMatch(predicate, exchange);
+        pool.shutdownNow();
     }
 
 }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java?rev=1003810&r1=1003809&r2=1003810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java Sat Oct  2 15:16:09 2010
@@ -40,6 +40,9 @@ public class PredicateBuilderTest extend
     public void testPredicates() throws Exception {
         assertMatches(header("name").isEqualTo(constant("James")));
         assertMatches(not(header("name").isEqualTo(constant("Claus"))));
+
+        assertMatches(header("size").isEqualTo(10));
+        assertMatches(header("size").isEqualTo("10"));
     }
 
     public void testFailingPredicates() throws Exception {

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CBRConcurrencyIssueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CBRConcurrencyIssueTest.java?rev=1003810&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CBRConcurrencyIssueTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CBRConcurrencyIssueTest.java Sat Oct  2 15:16:09 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class CBRConcurrencyIssueTest extends ContextTestSupport {
+
+    public void testCBRConcurrencyIssue() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:other").expectedBodiesReceived("Bye World");
+
+        template.sendBodyAndHeader("seda:start", "Hello World", "foo", "send");
+        template.sendBodyAndHeader("seda:start", "Bye World", "foo", "receive");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testCBRConcurrencyManyMessagesIssue() throws Exception {
+        getMockEndpoint("mock:result").expectedMessageCount(50);
+        getMockEndpoint("mock:other").expectedMessageCount(150);
+        
+        for (int i = 0; i < 200; i++) {
+            if (i % 4 == 0) {
+                template.sendBodyAndHeader("seda:start", "Hello World", "foo", "send");
+            } else {
+                template.sendBodyAndHeader("seda:start", "Bye World", "foo", "receive");
+            }
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() {
+        return new RouteBuilder() {
+            public void configure() {
+                from("seda:start?concurrentConsumers=10")
+                    .log("Got foo ${header.foo} header")
+                    .choice()
+                        .when(header("foo").isEqualTo("send")).to("mock:result")
+                        .when(header("foo").isEqualTo("receive")).to("mock:other");
+            }
+        };
+    }
+
+}