You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/11/13 14:42:46 UTC
svn commit: r1034759 - in /camel/trunk: ./
camel-core/src/main/java/org/apache/camel/processor/
camel-core/src/test/java/org/apache/camel/processor/
Author: davsclaus
Date: Sat Nov 13 13:42:45 2010
New Revision: 1034759
URL: http://svn.apache.org/viewvc?rev=1034759&view=rev
Log:
CAMEL-3333: Fixed splitter with nested multicast having its UseOriginalAggregationStrategy being removed mistakenly by the nested multicast.
Added:
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithDualMulticastTest.java
camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithMulticastTest.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
camel/trunk/pom.xml
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1034759&r1=1034758&r2=1034759&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Sat Nov 13 13:42:45 2010
@@ -18,17 +18,17 @@ package org.apache.camel.processor;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletionService;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorCompletionService;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,12 +38,10 @@ import org.apache.camel.CamelContext;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Endpoint;
import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeTimedOutException;
import org.apache.camel.Navigate;
import org.apache.camel.Processor;
import org.apache.camel.Producer;
import org.apache.camel.builder.ErrorHandlerBuilder;
-import org.apache.camel.impl.DefaultExchange;
import org.apache.camel.impl.ServiceSupport;
import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -51,6 +49,7 @@ import org.apache.camel.processor.aggreg
import org.apache.camel.spi.RouteContext;
import org.apache.camel.spi.TracedRouteNodes;
import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CastUtils;
import org.apache.camel.util.EventHelper;
import org.apache.camel.util.ExchangeHelper;
import org.apache.camel.util.ObjectHelper;
@@ -542,7 +541,7 @@ public class MulticastProcessor extends
*/
protected void doDone(Exchange original, Exchange subExchange, AsyncCallback callback, boolean doneSync) {
// cleanup any per exchange aggregation strategy
- original.removeProperty(Exchange.AGGREGATION_STRATEGY);
+ removeAggregationStrategyFromExchange(original);
if (original.getException() != null) {
// multicast uses error handling on its output processors and they have tried to redeliver
// so we shall signal back to the other error handlers that we are exhausted and they should not
@@ -660,7 +659,11 @@ public class MulticastProcessor extends
// prefer to use per Exchange aggregation strategy over a global strategy
if (exchange != null) {
- answer = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, AggregationStrategy.class);
+ Map property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
+ Map<Object, AggregationStrategy> map = CastUtils.cast(property);
+ if (map != null) {
+ answer = map.get(this);
+ }
}
if (answer == null) {
// fallback to global strategy
@@ -670,6 +673,40 @@ public class MulticastProcessor extends
}
/**
+ * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}.
+ *
+ * @param exchange the exchange
+ * @param aggregationStrategy the strategy
+ */
+ protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) {
+ Map property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
+ Map<Object, AggregationStrategy> map = CastUtils.cast(property);
+ if (map == null) {
+ map = new HashMap<Object, AggregationStrategy>();
+ }
+ // store the strategy using this processor as the key
+ // (so we can store multiple strategies on the same exchange)
+ map.put(this, aggregationStrategy);
+ exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map);
+ }
+
+ /**
+ * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange}
+ * which must be done after use.
+ *
+ * @param exchange the current exchange
+ */
+ protected void removeAggregationStrategyFromExchange(Exchange exchange) {
+ Map property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
+ Map<Object, AggregationStrategy> map = CastUtils.cast(property);
+ if (map == null) {
+ return;
+ }
+ // remove the strategy using this processor as the key
+ map.remove(this);
+ }
+
+ /**
* Is the multicast processor working in streaming mode?
* <p/>
* In streaming mode:
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=1034759&r1=1034758&r2=1034759&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Sat Nov 13 13:42:45 2010
@@ -88,7 +88,7 @@ public class Splitter extends MulticastP
// to ensure it supports async routing
if (strategy == null) {
UseOriginalAggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true);
- exchange.setProperty(Exchange.AGGREGATION_STRATEGY, original);
+ setAggregationStrategyOnExchange(exchange, original);
}
return super.process(exchange, callback);
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithDualMulticastTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithDualMulticastTest.java?rev=1034759&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithDualMulticastTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithDualMulticastTest.java Sat Nov 13 13:42:45 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class SplitterWithDualMulticastTest extends ContextTestSupport {
+
+ public void testSplitterWithDualMulticast() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C");
+ // should get the original input message without any headers etc
+ getMockEndpoint("mock:result").allMessages().header("bar").isNull();
+ getMockEndpoint("mock:result").allMessages().header("foo").isNull();
+ getMockEndpoint("mock:result").allMessages().header("beer").isNull();
+
+ getMockEndpoint("mock:split").expectedBodiesReceived("A", "B", "C");
+ // should have the bar header because multicast uses UseLatestAggregationStrategy by default
+ getMockEndpoint("mock:split").expectedHeaderReceived("bar", 123);
+ // should NOT have the foo header because multicast uses UseLatestAggregationStrategy by default
+ getMockEndpoint("mock:split").allMessages().header("foo").isNull();
+
+ getMockEndpoint("mock:split2").expectedBodiesReceived("A", "B", "C");
+ // should have the bar header from the previous multicast (which value 123)
+ getMockEndpoint("mock:split2").expectedHeaderReceived("bar", 123);
+ // should have the beer which was set on the 2nd multicast
+ getMockEndpoint("mock:split2").expectedHeaderReceived("beer", "Carlsberg");
+ // and no foo header at all
+ getMockEndpoint("mock:split2").allMessages().header("foo").isNull();
+
+ template.sendBody("direct:start", "A,B,C");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .split(body().tokenize(","))
+ .multicast()
+ .setHeader("foo", constant("ABC"))
+ .setHeader("bar", constant(123))
+ .end()
+ .to("log:split?showHeaders=true", "mock:split")
+ .multicast()
+ .setHeader("bar", constant(456))
+ .setHeader("beer", constant("Carlsberg"))
+ .end()
+ .to("log:split2?showHeaders=true", "mock:split2")
+ .end()
+ .to("log:result?showHeaders=true", "mock:result");
+ }
+ };
+ }
+}
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithMulticastTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithMulticastTest.java?rev=1034759&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithMulticastTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithMulticastTest.java Sat Nov 13 13:42:45 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class SplitterWithMulticastTest extends ContextTestSupport {
+
+ public void testSplitterWithMulticast() throws Exception {
+ getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C");
+ // should get the original input message without any headers etc
+ getMockEndpoint("mock:result").allMessages().header("bar").isNull();
+ getMockEndpoint("mock:result").allMessages().header("foo").isNull();
+
+ getMockEndpoint("mock:split").expectedBodiesReceived("A", "B", "C");
+ // should have the bar header because multicast uses UseLatestAggregationStrategy by default
+ getMockEndpoint("mock:split").expectedHeaderReceived("bar", 123);
+ // should NOT have the foo header because multicast uses UseLatestAggregationStrategy by default
+ getMockEndpoint("mock:split").allMessages().header("foo").isNull();
+
+ template.sendBody("direct:start", "A,B,C");
+
+ assertMockEndpointsSatisfied();
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .split(body().tokenize(","))
+ .multicast()
+ .setHeader("foo", constant("ABC"))
+ .setHeader("bar", constant(123))
+ .end()
+ .to("log:split?showHeaders=true", "mock:split")
+ .end()
+ .to("log:result?showHeaders=true", "mock:result");
+ }
+ };
+ }
+}
Modified: camel/trunk/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/pom.xml?rev=1034759&r1=1034758&r2=1034759&view=diff
==============================================================================
--- camel/trunk/pom.xml (original)
+++ camel/trunk/pom.xml Sat Nov 13 13:42:45 2010
@@ -507,7 +507,7 @@
<profiles>
<profile>
- <!-- really just used for the eclispe setup as eclipse will stick JSE-15 as the compliance
+ <!-- really just used for the eclipse setup as eclipse will stick JSE-15 as the compliance
level if we don't reconfigure the compiler plugin with 1.6. When it does that, things
like JAXB and other built in JDK things won't be found -->
<id>jdk15</id>