You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/10/02 17:16:10 UTC
svn commit: r1003810 - in /camel/trunk/camel-core/src:
main/java/org/apache/camel/ main/java/org/apache/camel/builder/
main/java/org/apache/camel/processor/ main/java/org/apache/camel/util/
test/java/org/apache/camel/builder/ test/java/org/apache/camel...
Author: davsclaus
Date: Sat Oct 2 15:16:09 2010
New Revision: 1003810
URL: http://svn.apache.org/viewvc?rev=1003810&view=rev
Log:
CAMEL-3188: Fixed concurrency issue with binary predicates.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderConcurrentTest.java
- copied, changed from r1003532, camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CBRConcurrencyIssueTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/BinaryPredicate.java
camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BinaryPredicateSupport.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/BinaryPredicate.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/BinaryPredicate.java?rev=1003810&r1=1003809&r2=1003810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/BinaryPredicate.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/BinaryPredicate.java Sat Oct 2 15:16:09 2010
@@ -52,14 +52,18 @@ public interface BinaryPredicate extends
Expression getRight();
/**
- * Gets the evaluated left hand side value
+ * Gets the evaluated left hand side value.
+ * <p/>
+ * Beware of thread safety that the result of the {@link #getRightValue()} may in fact be from another evaluation.
*
* @return the left value, may be <tt>null</tt> if predicate has not been matched yet.
*/
Object getLeftValue();
/**
- * Gets the evaluated right hand side value
+ * Gets the evaluated right hand side value.
+ * <p/>
+ * Beware of thread safety that the result of the {@link #getLeftValue()} may in fact be from another evaluation.
*
* @return the right value, may be <tt>null</tt> if predicate has not been matched yet.
*/
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BinaryPredicateSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BinaryPredicateSupport.java?rev=1003810&r1=1003809&r2=1003810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BinaryPredicateSupport.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/builder/BinaryPredicateSupport.java Sat Oct 2 15:16:09 2010
@@ -32,8 +32,8 @@ public abstract class BinaryPredicateSup
private final Expression left;
private final Expression right;
- private Object leftValue;
- private Object rightValue;
+ private Object lastLeftValue;
+ private Object lastRightValue;
protected BinaryPredicateSupport(Expression left, Expression right) {
notNull(left, "left");
@@ -49,8 +49,12 @@ public abstract class BinaryPredicateSup
}
public boolean matches(Exchange exchange) {
- leftValue = left.evaluate(exchange, Object.class);
- rightValue = right.evaluate(exchange, Object.class);
+ // must be thread safe and store result in local objects
+ Object leftValue = left.evaluate(exchange, Object.class);
+ Object rightValue = right.evaluate(exchange, Object.class);
+ // remember last result (may not be thread safe)
+ lastRightValue = rightValue;
+ lastLeftValue = leftValue;
return matches(exchange, leftValue, rightValue);
}
@@ -71,10 +75,10 @@ public abstract class BinaryPredicateSup
}
public Object getRightValue() {
- return rightValue;
+ return lastRightValue;
}
public Object getLeftValue() {
- return leftValue;
+ return lastLeftValue;
}
}
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java?rev=1003810&r1=1003809&r2=1003810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/ChoiceProcessor.java Sat Oct 2 15:16:09 2010
@@ -29,6 +29,8 @@ import org.apache.camel.impl.ServiceSupp
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.ServiceHelper;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* Implements a Choice structure where one or more predicates are used which if
@@ -38,6 +40,7 @@ import org.apache.camel.util.ServiceHelp
* @version $Revision$
*/
public class ChoiceProcessor extends ServiceSupport implements AsyncProcessor, Navigate<Processor>, Traceable {
+ private static final transient Log LOG = LogFactory.getLog(ChoiceProcessor.class);
private final List<FilterProcessor> filters;
private final AsyncProcessor otherwise;
@@ -51,8 +54,9 @@ public class ChoiceProcessor extends Ser
}
public boolean process(Exchange exchange, AsyncCallback callback) {
- for (FilterProcessor filterProcessor : filters) {
- Predicate predicate = filterProcessor.getPredicate();
+ for (int i = 0; i < filters.size(); i++) {
+ FilterProcessor filter = filters.get(i);
+ Predicate predicate = filter.getPredicate();
boolean matches = false;
try {
@@ -66,10 +70,14 @@ public class ChoiceProcessor extends Ser
return true;
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("#" + i + " - " + predicate + " matches: " + matches + " for: " + exchange);
+ }
+
if (matches) {
// process next will also take care (has not null test) if next was a stop().
// stop() has no processor to execute, and thus we will end in a NPE
- return filterProcessor.processNext(exchange, callback);
+ return filter.processNext(exchange, callback);
}
}
if (otherwise != null) {
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java?rev=1003810&r1=1003809&r2=1003810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/util/ObjectHelper.java Sat Oct 2 15:16:09 2010
@@ -19,7 +19,6 @@ package org.apache.camel.util;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
-import java.io.IOException;
import java.io.InputStream;
import java.lang.annotation.Annotation;
import java.lang.reflect.AnnotatedElement;
@@ -68,18 +67,26 @@ public final class ObjectHelper {
/**
* A helper method for comparing objects for equality in which it uses type coerce to coerce
* types between the left and right values. This allows you to equal test eg String and Integer as
- * Camel will be able to coerce the types
+ * Camel will be able to coerce the types.
*/
public static boolean typeCoerceEquals(TypeConverter converter, Object leftValue, Object rightValue) {
+ // sanity check
+ if (leftValue == null && rightValue == null) {
+ // they are equal
+ return true;
+ } else if (leftValue == null || rightValue == null) {
+ // only one of them is null so they are not equal
+ return false;
+ }
+
// try without type coerce
boolean answer = equal(leftValue, rightValue);
if (answer) {
return true;
}
- if (leftValue == null || rightValue == null) {
- // no reason to continue as the first equal did not match and now one of the values is null
- // so it wont help to type coerce to a null type
+ // are they same type, if so return false as the equals returned false
+ if (leftValue.getClass().isInstance(rightValue)) {
return false;
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderConcurrentTest.java (from r1003532, camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderConcurrentTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderConcurrentTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java&r1=1003532&r2=1003810&rev=1003810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderConcurrentTest.java Sat Oct 2 15:16:09 2010
@@ -16,120 +16,68 @@
*/
package org.apache.camel.builder;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
-import org.apache.camel.Message;
+import org.apache.camel.Expression;
import org.apache.camel.Predicate;
-import org.apache.camel.TestSupport;
-import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.impl.DefaultExchange;
-import static org.apache.camel.builder.Builder.constant;
-import static org.apache.camel.builder.PredicateBuilder.in;
-import static org.apache.camel.builder.PredicateBuilder.not;
/**
* @version $Revision$
*/
-public class PredicateBuilderTest extends TestSupport {
- protected Exchange exchange = new DefaultExchange(new DefaultCamelContext());
-
- public void testRegexPredicates() throws Exception {
- assertMatches(header("location").regex("[a-zA-Z]+,London,UK"));
- assertDoesNotMatch(header("location").regex("[a-zA-Z]+,Westminster,[a-zA-Z]+"));
- }
-
- public void testPredicates() throws Exception {
- assertMatches(header("name").isEqualTo(constant("James")));
- assertMatches(not(header("name").isEqualTo(constant("Claus"))));
- }
-
- public void testFailingPredicates() throws Exception {
- assertDoesNotMatch(header("name").isEqualTo(constant("Hiram")));
- assertDoesNotMatch(header("size").isGreaterThan(constant(100)));
- assertDoesNotMatch(not(header("size").isLessThan(constant(100))));
- }
-
- public void testCompoundOrPredicates() throws Exception {
- Predicate p1 = header("name").isEqualTo(constant("Hiram"));
- Predicate p2 = header("size").isGreaterThanOrEqualTo(constant(10));
- Predicate or = PredicateBuilder.or(p1, p2);
-
- assertMatches(or);
- }
-
- public void testCompoundAndPredicates() throws Exception {
- Predicate p1 = header("name").isEqualTo(constant("James"));
- Predicate p2 = header("size").isGreaterThanOrEqualTo(constant(10));
- Predicate and = PredicateBuilder.and(p1, p2);
-
- assertMatches(and);
- }
-
- public void testCompoundAndOrPredicates() throws Exception {
- Predicate p1 = header("name").isEqualTo(constant("Hiram"));
- Predicate p2 = header("size").isGreaterThan(constant(100));
- Predicate p3 = header("location").contains("London");
- Predicate and = PredicateBuilder.and(p1, p2);
- Predicate andor = PredicateBuilder.or(and, p3);
-
- assertMatches(andor);
- }
-
- public void testPredicateIn() throws Exception {
- assertMatches(in(header("name").isEqualTo("Hiram"), header("name").isEqualTo("James")));
- }
-
- public void testValueIn() throws Exception {
- assertMatches(header("name").in("Hiram", "Jonathan", "James", "Claus"));
- }
-
- public void testStartsWith() throws Exception {
- assertMatches(header("name").startsWith("J"));
- assertMatches(header("name").startsWith("James"));
- assertDoesNotMatch(header("name").startsWith("C"));
-
- assertMatches(header("size").startsWith("1"));
- assertMatches(header("size").startsWith("10"));
- assertDoesNotMatch(header("size").startsWith("99"));
- assertDoesNotMatch(header("size").startsWith("9"));
-
- assertMatches(header("size").startsWith(1));
- assertMatches(header("size").startsWith(10));
- assertDoesNotMatch(header("size").startsWith(99));
- assertDoesNotMatch(header("size").startsWith(9));
- }
-
- public void testEndsWith() throws Exception {
- assertMatches(header("name").endsWith("mes"));
- assertMatches(header("name").endsWith("James"));
- assertDoesNotMatch(header("name").endsWith("world"));
-
- assertMatches(header("size").endsWith("0"));
- assertMatches(header("size").endsWith("10"));
- assertDoesNotMatch(header("size").endsWith("99"));
- assertDoesNotMatch(header("size").endsWith("9"));
-
- assertMatches(header("size").endsWith(0));
- assertMatches(header("size").endsWith(10));
- assertDoesNotMatch(header("size").endsWith(99));
- assertDoesNotMatch(header("size").endsWith(9));
- }
+public class PredicateBuilderConcurrentTest extends ContextTestSupport {
@Override
- protected void setUp() throws Exception {
- super.setUp();
- Message in = exchange.getIn();
- in.setBody("Hello there!");
- in.setHeader("name", "James");
- in.setHeader("location", "Islington,London,UK");
- in.setHeader("size", 10);
+ public boolean isUseRouteBuilder() {
+ return false;
}
- protected void assertMatches(Predicate predicate) {
- assertPredicateMatches(predicate, exchange);
- }
+ public void testPredicateBuilderConcurrent() throws Exception {
+ List<Future<Boolean>> futures = new ArrayList<Future<Boolean>>();
+
+ ExecutorService pool = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 1000; i++) {
+ final Integer num = i;
+ Future<Boolean> future = pool.submit(new Callable<Boolean>() {
+ public Boolean call() throws Exception {
+ Expression left = ExpressionBuilder.headerExpression("foo");
+ Expression right;
+ if (num % 2 == 0) {
+ right = ExpressionBuilder.constantExpression("ABC");
+ } else {
+ right = ExpressionBuilder.constantExpression("DEF");
+ }
+ Predicate predicate = PredicateBuilder.isEqualTo(left, right);
+
+ Exchange exchange = new DefaultExchange(context);
+ exchange.getIn().setBody("Hello World");
+ exchange.getIn().setHeader("foo", "ABC");
+
+ return predicate.matches(exchange);
+ }
+ });
+
+ futures.add(future);
+ }
+
+ for (int i = 0; i < 1000; i++) {
+ Boolean result = futures.get(i).get(10, TimeUnit.SECONDS);
+ if (i % 2 == 0) {
+ assertEquals("Should be true for #" + i, true, result.booleanValue());
+ } else {
+ assertEquals("Should be false for #" + i, false, result.booleanValue());
+ }
+ }
- protected void assertDoesNotMatch(Predicate predicate) {
- assertPredicateDoesNotMatch(predicate, exchange);
+ pool.shutdownNow();
}
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java?rev=1003810&r1=1003809&r2=1003810&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/builder/PredicateBuilderTest.java Sat Oct 2 15:16:09 2010
@@ -40,6 +40,9 @@ public class PredicateBuilderTest extend
public void testPredicates() throws Exception {
assertMatches(header("name").isEqualTo(constant("James")));
assertMatches(not(header("name").isEqualTo(constant("Claus"))));
+
+ assertMatches(header("size").isEqualTo(10));
+ assertMatches(header("size").isEqualTo("10"));
}
public void testFailingPredicates() throws Exception {
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CBRConcurrencyIssueTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CBRConcurrencyIssueTest.java?rev=1003810&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CBRConcurrencyIssueTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/issues/CBRConcurrencyIssueTest.java Sat Oct 2 15:16:09 2010
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.issues;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class CBRConcurrencyIssueTest extends ContextTestSupport {
+
+ public void testCBRConcurrencyIssue() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("Hello World");
+ getMockEndpoint("mock:other").expectedBodiesReceived("Bye World");
+
+ template.sendBodyAndHeader("seda:start", "Hello World", "foo", "send");
+ template.sendBodyAndHeader("seda:start", "Bye World", "foo", "receive");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testCBRConcurrencyManyMessagesIssue() throws Exception {
+ getMockEndpoint("mock:result").expectedMessageCount(50);
+ getMockEndpoint("mock:other").expectedMessageCount(150);
+
+ for (int i = 0; i < 200; i++) {
+ if (i % 4 == 0) {
+ template.sendBodyAndHeader("seda:start", "Hello World", "foo", "send");
+ } else {
+ template.sendBodyAndHeader("seda:start", "Bye World", "foo", "receive");
+ }
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() {
+ return new RouteBuilder() {
+ public void configure() {
+ from("seda:start?concurrentConsumers=10")
+ .log("Got foo ${header.foo} header")
+ .choice()
+ .when(header("foo").isEqualTo("send")).to("mock:result")
+ .when(header("foo").isEqualTo("receive")).to("mock:other");
+ }
+ };
+ }
+
+}