You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2010/11/13 14:42:46 UTC

svn commit: r1034759 - in /camel/trunk: ./ camel-core/src/main/java/org/apache/camel/processor/ camel-core/src/test/java/org/apache/camel/processor/

Author: davsclaus
Date: Sat Nov 13 13:42:45 2010
New Revision: 1034759

URL: http://svn.apache.org/viewvc?rev=1034759&view=rev
Log:
CAMEL-3333: Fixed splitter with nested multicast having its UseOriginalAggregationStrategy being removed mistakenly by the nested multicast.

Added:
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithDualMulticastTest.java
    camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithMulticastTest.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
    camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
    camel/trunk/pom.xml

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java?rev=1034759&r1=1034758&r2=1034759&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/MulticastProcessor.java Sat Nov 13 13:42:45 2010
@@ -18,17 +18,17 @@ package org.apache.camel.processor;
 
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletionService;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorCompletionService;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
@@ -38,12 +38,10 @@ import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Endpoint;
 import org.apache.camel.Exchange;
-import org.apache.camel.ExchangeTimedOutException;
 import org.apache.camel.Navigate;
 import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.builder.ErrorHandlerBuilder;
-import org.apache.camel.impl.DefaultExchange;
 import org.apache.camel.impl.ServiceSupport;
 import org.apache.camel.impl.converter.AsyncProcessorTypeConverter;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -51,6 +49,7 @@ import org.apache.camel.processor.aggreg
 import org.apache.camel.spi.RouteContext;
 import org.apache.camel.spi.TracedRouteNodes;
 import org.apache.camel.util.AsyncProcessorHelper;
+import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.EventHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -542,7 +541,7 @@ public class MulticastProcessor extends 
      */
     protected void doDone(Exchange original, Exchange subExchange, AsyncCallback callback, boolean doneSync) {
         // cleanup any per exchange aggregation strategy
-        original.removeProperty(Exchange.AGGREGATION_STRATEGY);
+        removeAggregationStrategyFromExchange(original);
         if (original.getException() != null) {
             // multicast uses error handling on its output processors and they have tried to redeliver
             // so we shall signal back to the other error handlers that we are exhausted and they should not
@@ -660,7 +659,11 @@ public class MulticastProcessor extends 
 
         // prefer to use per Exchange aggregation strategy over a global strategy
         if (exchange != null) {
-            answer = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, AggregationStrategy.class);
+            Map property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
+            Map<Object, AggregationStrategy> map = CastUtils.cast(property);
+            if (map != null) {
+                answer = map.get(this);
+            }
         }
         if (answer == null) {
             // fallback to global strategy
@@ -670,6 +673,40 @@ public class MulticastProcessor extends 
     }
 
     /**
+     * Sets the given {@link org.apache.camel.processor.aggregate.AggregationStrategy} on the {@link Exchange}.
+     *
+     * @param exchange  the exchange
+     * @param aggregationStrategy  the strategy
+     */
+    protected void setAggregationStrategyOnExchange(Exchange exchange, AggregationStrategy aggregationStrategy) {
+        Map property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
+        Map<Object, AggregationStrategy> map = CastUtils.cast(property);
+        if (map == null) {
+            map = new HashMap<Object, AggregationStrategy>();
+        }
+        // store the strategy using this processor as the key
+        // (so we can store multiple strategies on the same exchange)
+        map.put(this, aggregationStrategy);
+        exchange.setProperty(Exchange.AGGREGATION_STRATEGY, map);
+    }
+
+    /**
+     * Removes the associated {@link org.apache.camel.processor.aggregate.AggregationStrategy} from the {@link Exchange}
+     * which must be done after use.
+     *
+     * @param exchange the current exchange
+     */
+    protected void removeAggregationStrategyFromExchange(Exchange exchange) {
+        Map property = exchange.getProperty(Exchange.AGGREGATION_STRATEGY, Map.class);
+        Map<Object, AggregationStrategy> map = CastUtils.cast(property);
+        if (map == null) {
+            return;
+        }
+        // remove the strategy using this processor as the key
+        map.remove(this);
+    }
+
+    /**
      * Is the multicast processor working in streaming mode?
      * <p/>
      * In streaming mode:

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java?rev=1034759&r1=1034758&r2=1034759&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/processor/Splitter.java Sat Nov 13 13:42:45 2010
@@ -88,7 +88,7 @@ public class Splitter extends MulticastP
         // to ensure it supports async routing
         if (strategy == null) {
             UseOriginalAggregationStrategy original = new UseOriginalAggregationStrategy(exchange, true);
-            exchange.setProperty(Exchange.AGGREGATION_STRATEGY, original);
+            setAggregationStrategyOnExchange(exchange, original);
         }
 
         return super.process(exchange, callback);

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithDualMulticastTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithDualMulticastTest.java?rev=1034759&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithDualMulticastTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithDualMulticastTest.java Sat Nov 13 13:42:45 2010
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class SplitterWithDualMulticastTest extends ContextTestSupport {
+
+    public void testSplitterWithDualMulticast() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C");
+        // should get the original input message without any headers etc
+        getMockEndpoint("mock:result").allMessages().header("bar").isNull();
+        getMockEndpoint("mock:result").allMessages().header("foo").isNull();
+        getMockEndpoint("mock:result").allMessages().header("beer").isNull();
+
+        getMockEndpoint("mock:split").expectedBodiesReceived("A", "B", "C");
+        // should have the bar header because multicast uses UseLatestAggregationStrategy by default
+        getMockEndpoint("mock:split").expectedHeaderReceived("bar", 123);
+        // should NOT have the foo header because multicast uses UseLatestAggregationStrategy by default
+        getMockEndpoint("mock:split").allMessages().header("foo").isNull();
+
+        getMockEndpoint("mock:split2").expectedBodiesReceived("A", "B", "C");
+        // should have the bar header from the previous multicast (which value 123)
+        getMockEndpoint("mock:split2").expectedHeaderReceived("bar", 123);
+        // should have the beer which was set on the 2nd multicast
+        getMockEndpoint("mock:split2").expectedHeaderReceived("beer", "Carlsberg");
+        // and no foo header at all
+        getMockEndpoint("mock:split2").allMessages().header("foo").isNull();
+
+        template.sendBody("direct:start", "A,B,C");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .split(body().tokenize(","))
+                        .multicast()
+                            .setHeader("foo", constant("ABC"))
+                            .setHeader("bar", constant(123))
+                        .end()
+                        .to("log:split?showHeaders=true", "mock:split")
+                        .multicast()
+                            .setHeader("bar", constant(456))
+                            .setHeader("beer", constant("Carlsberg"))
+                        .end()
+                        .to("log:split2?showHeaders=true", "mock:split2")
+                    .end()
+                    .to("log:result?showHeaders=true", "mock:result");
+            }
+        };
+    }
+}

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithMulticastTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithMulticastTest.java?rev=1034759&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithMulticastTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitterWithMulticastTest.java Sat Nov 13 13:42:45 2010
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.builder.RouteBuilder;
+
+/**
+ * @version $Revision$
+ */
+public class SplitterWithMulticastTest extends ContextTestSupport {
+
+    public void testSplitterWithMulticast() throws Exception {
+        getMockEndpoint("mock:result").expectedBodiesReceived("A,B,C");
+        // should get the original input message without any headers etc
+        getMockEndpoint("mock:result").allMessages().header("bar").isNull();
+        getMockEndpoint("mock:result").allMessages().header("foo").isNull();
+
+        getMockEndpoint("mock:split").expectedBodiesReceived("A", "B", "C");
+        // should have the bar header because multicast uses UseLatestAggregationStrategy by default
+        getMockEndpoint("mock:split").expectedHeaderReceived("bar", 123);
+        // should NOT have the foo header because multicast uses UseLatestAggregationStrategy by default
+        getMockEndpoint("mock:split").allMessages().header("foo").isNull();
+
+        template.sendBody("direct:start", "A,B,C");
+
+        assertMockEndpointsSatisfied();
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .split(body().tokenize(","))
+                        .multicast()
+                            .setHeader("foo", constant("ABC"))
+                            .setHeader("bar", constant(123))
+                        .end()
+                        .to("log:split?showHeaders=true", "mock:split")
+                    .end()
+                    .to("log:result?showHeaders=true", "mock:result");
+            }
+        };
+    }
+}

Modified: camel/trunk/pom.xml
URL: http://svn.apache.org/viewvc/camel/trunk/pom.xml?rev=1034759&r1=1034758&r2=1034759&view=diff
==============================================================================
--- camel/trunk/pom.xml (original)
+++ camel/trunk/pom.xml Sat Nov 13 13:42:45 2010
@@ -507,7 +507,7 @@
 
   <profiles>
     <profile>
-      <!-- really just used for the eclispe setup as eclipse will stick JSE-15 as the compliance
+      <!-- really just used for the eclipse setup as eclipse will stick JSE-15 as the compliance
       level if we don't reconfigure the compiler plugin with 1.6.  When it does that, things 
       like JAXB and other built in JDK things won't be found -->
       <id>jdk15</id>