You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/23 08:56:58 UTC

svn commit: r777808 [2/2] - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ camel-core/src/main/java/org/apache/camel/component/file/ camel-core/src/main/java/org/apache/camel/component/seda/ camel-core/src/main/java/org/apache/camel/impl/ ...

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java Sat May 23 06:56:56 2009
@@ -48,9 +48,9 @@
         mock.expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
 
         // test header keys
-        mock.message(0).header(Exchange.FILE_BATCH_SIZE).isEqualTo(2);
-        mock.message(0).header(Exchange.FILE_BATCH_INDEX).isEqualTo(0);
-        mock.message(1).header(Exchange.FILE_BATCH_INDEX).isEqualTo(1);
+        mock.message(0).header(Exchange.BATCH_SIZE).isEqualTo(2);
+        mock.message(0).header(Exchange.BATCH_INDEX).isEqualTo(0);
+        mock.message(1).header(Exchange.BATCH_INDEX).isEqualTo(1);
 
         assertMockEndpointsSatisfied();
     }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java Sat May 23 06:56:56 2009
@@ -18,18 +18,17 @@
 
 import org.apache.camel.Exchange;
 import org.apache.camel.Header;
-import org.apache.camel.Message;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
 
 public class BodyInAggregatingStrategy implements AggregationStrategy {
 
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-        Exchange copy = newExchange.copy();
-        Message newIn = copy.getIn();
-        String oldBody = oldExchange.getIn().getBody(String.class);
-        String newBody = newIn.getBody(String.class);
-        newIn.setBody(oldBody + "+" + newBody);
-        return copy;
+        if (oldExchange != null) {
+            String oldBody = oldExchange.getIn().getBody(String.class);
+            String newBody = newExchange.getIn().getBody(String.class);
+            newExchange.getIn().setBody(oldBody + "+" + newBody);
+        }
+        return newExchange;
     }
 
     /**

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyOutAggregatingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyOutAggregatingStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyOutAggregatingStrategy.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyOutAggregatingStrategy.java Sat May 23 06:56:56 2009
@@ -23,10 +23,12 @@
 public class BodyOutAggregatingStrategy implements AggregationStrategy {
 
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-        Message newOut = newExchange.getOut();
-        String oldBody = oldExchange.getOut().getBody(String.class);
-        String newBody = newOut.getBody(String.class);
-        newOut.setBody(oldBody + "+" + newBody);
+        if (oldExchange != null) {
+            String oldBody = oldExchange.getOut().getBody(String.class);
+            String newBody = newExchange.getOut().getBody(String.class);
+            newExchange.getOut().setBody(oldBody + "+" + newBody);
+        }
+
         return newExchange;
     }
 

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java Sat May 23 06:56:56 2009
@@ -152,10 +152,16 @@
      */
     // START SNIPPET: e7  
     public static final class MyOrderAggregationStrategy implements AggregationStrategy {
+
         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                return newExchange;
+            }
+
             List<OrderItem> order = new ArrayList<OrderItem>(2);
             order.add(oldExchange.getIn().getBody(OrderItem.class));
             order.add(newExchange.getIn().getBody(OrderItem.class));
+
             oldExchange.getIn().setBody(order);
             return oldExchange;
         }

Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerAsyncTest.java (from r775966, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerAsyncTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerAsyncTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java&r1=775966&r2=777808&rev=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerAsyncTest.java Sat May 23 06:56:56 2009
@@ -28,7 +28,7 @@
 /**
  * @version $Revision$
  */
-public class IdempotentConsumerTest extends ContextTestSupport {
+public class IdempotentConsumerAsyncTest extends ContextTestSupport {
     protected Endpoint startEndpoint;
     protected MockEndpoint resultEndpoint;
 
@@ -43,7 +43,7 @@
             public void configure() throws Exception {
                 from("direct:start").idempotentConsumer(
                         header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
-                ).to("mock:result");
+                ).async().to("mock:result");
             }
         });
         context.start();
@@ -68,7 +68,7 @@
 
                 from("direct:start").idempotentConsumer(
                         header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
-                ).process(new Processor() {
+                ).async().process(new Processor() {
                     public void process(Exchange exchange) throws Exception {
                         String id = exchange.getIn().getHeader("messageId", String.class);
                         if (id.equals("2")) {
@@ -94,7 +94,7 @@
         assertMockEndpointsSatisfied();
     }
 
-    protected void sendMessage(final Object messageId, final Object body) {
+    protected void sendMessage(final Object messageId, final Object body) throws Exception {
         template.send(startEndpoint, new Processor() {
             public void process(Exchange exchange) {
                 // now lets fire in a message
@@ -103,6 +103,9 @@
                 in.setHeader("messageId", messageId);
             }
         });
+
+        // must sleep a little as the route is async and we can be to fast
+        Thread.sleep(50);
     }
 
     @Override
@@ -113,4 +116,4 @@
         resultEndpoint = getMockEndpoint("mock:result");
     }
 
-}
+}
\ No newline at end of file

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java?rev=777808&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java Sat May 23 06:56:56 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class MulticastAnotherAggregatorTest extends ContextTestSupport {
+
+    public void testMulticastReceivesItsOwnExchangeParallelly() throws Exception {
+        sendingAMessageUsingMulticastReceivesItsOwnExchange(true);
+    }
+
+    public void testMulticastReceivesItsOwnExchangeSequentially() throws Exception {
+        sendingAMessageUsingMulticastReceivesItsOwnExchange(false);
+    }
+
+    private void sendingAMessageUsingMulticastReceivesItsOwnExchange(boolean isParallel) throws Exception {
+        MockEndpoint result = getMockEndpoint("mock:result");
+        result.expectedBodiesReceived("inputx+inputy+inputz");
+
+        String url;
+        if (isParallel) {
+            url = "direct:parallel";
+        } else {
+            url = "direct:sequential";
+        }
+
+        // use InOut
+        Exchange exchange = template.request(url, new Processor() {
+            public void process(Exchange exchange) {
+                Message in = exchange.getIn();
+                in.setBody("input");
+                in.setHeader("foo", "bar");
+            }
+        });
+
+        assertNotNull("We should get result here", exchange);
+        assertEquals("Can't get the right result", "inputx+inputy+inputz", exchange.getOut().getBody(String.class));
+
+        assertMockEndpointsSatisfied();
+    }
+
+    protected RouteBuilder createRouteBuilder() {
+
+        return new RouteBuilder() {
+            public void configure() {
+                ThreadPoolExecutor tpExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
+                // START SNIPPET: example
+                // The message will be sent parallelly to the endpoints
+                from("direct:parallel")
+                    .multicast(new BodyOutAggregatingStrategy(), true).executorService(tpExecutor)
+                        .to("direct:x", "direct:y", "direct:z");
+                // Multicast the message in a sequential way
+                from("direct:sequential").multicast(new BodyOutAggregatingStrategy()).to("direct:x", "direct:y", "direct:z");
+
+                from("direct:x").process(new AppendingProcessor("x")).to("direct:aggregator");
+                from("direct:y").process(new AppendingProcessor("y")).to("direct:aggregator");
+                from("direct:z").process(new AppendingProcessor("z")).to("direct:aggregator");
+
+                from("direct:aggregator").aggregate(header("cheese"), new BodyInAggregatingStrategy()).
+                        completionPredicate(header(Exchange.AGGREGATED_SIZE).isEqualTo(3)).to("mock:result");
+                // END SNIPPET: example
+            }
+        };
+
+    }
+
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java Sat May 23 06:56:56 2009
@@ -18,6 +18,7 @@
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -36,6 +37,23 @@
         assertMockEndpointsSatisfied();
     }
 
+    public void testMulticastParallel() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(20);
+        mock.whenAnyExchangeReceived(new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                // they should all be BA as B is faster than A
+                assertEquals("BA", exchange.getIn().getBody(String.class));
+            }
+        });
+
+        for (int i = 0; i < 20; i++) {
+            template.sendBody("direct:start", "Hello");
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -44,6 +62,10 @@
                 from("direct:start")
                     .multicast(new AggregationStrategy() {
                         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                            if (oldExchange == null) {
+                                return newExchange;
+                            }
+
                             String body = oldExchange.getIn().getBody(String.class);
                             oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                             return oldExchange;

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStressTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStressTest.java?rev=777808&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStressTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStressTest.java Sat May 23 06:56:56 2009
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class MulticastParallelStressTest extends ContextTestSupport {
+
+    public void testTwoMulticast() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedBodiesReceived("ABCD", "ABCD");
+        mock.expectsAscending().header("id");
+
+        template.sendBodyAndHeader("direct:start", "", "id", 1);
+        template.sendBodyAndHeader("direct:start", "", "id", 2);
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testMoreMulticast() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(20);
+        mock.expectsAscending().header("id");
+
+        for (int i = 0; i < 20; i++) {
+            template.sendBodyAndHeader("direct:start", "", "id", i);
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+    public void testConcurrencyParallelMulticast() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(20);
+        // this time we cannot expect in order but there should be no duplicates
+        mock.expectsNoDuplicates(header("id"));
+
+        ExecutorService executor = Executors.newFixedThreadPool(10);
+        for (int i = 0; i < 20; i++) {
+            final int index = i;
+            executor.submit(new Callable<Object>() {
+                public Object call() throws Exception {
+                    template.sendBodyAndHeader("direct:start", "", "id", index);
+                    return null;
+                }
+            });
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                from("direct:start")
+                    .multicast(new AggregationStrategy() {
+                            public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                                if (oldExchange == null) {
+                                    return newExchange;
+                                }
+
+                                String body = oldExchange.getIn().getBody(String.class);
+                                oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
+                                return oldExchange;
+                            }
+                        }).parallelProcessing()
+                            .to("direct:a", "direct:b", "direct:c", "direct:d")
+                    // use end to indicate end of multicast route
+                    .end()
+                    .to("mock:result");
+
+                from("direct:a").delay(20).setBody(body().append("A"));
+
+                from("direct:b").setBody(body().append("B"));
+
+                from("direct:c").delay(50).setBody(body().append("C"));
+
+                from("direct:d").delay(10).setBody(body().append("D"));
+            }
+        };
+    }
+
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStressTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStressTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java Sat May 23 06:56:56 2009
@@ -18,6 +18,7 @@
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -27,7 +28,7 @@
  */
 public class MulticastParallelTest extends ContextTestSupport {
 
-    public void testMulticastParallel() throws Exception {
+    public void testSingleMulticastParallel() throws Exception {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedBodiesReceived("AB");
 
@@ -36,6 +37,23 @@
         assertMockEndpointsSatisfied();
     }
 
+    public void testMulticastParallel() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(20);
+        mock.whenAnyExchangeReceived(new Processor() {
+            public void process(Exchange exchange) throws Exception {
+                // they should all be AB even though A is slower than B
+                assertEquals("AB", exchange.getIn().getBody(String.class));
+            }
+        });
+
+        for (int i = 0; i < 20; i++) {
+            template.sendBody("direct:start", "Hello");
+        }
+
+        assertMockEndpointsSatisfied();
+    }
+
     @Override
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
@@ -44,6 +62,10 @@
                 from("direct:start")
                     .multicast(new AggregationStrategy() {
                             public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                                if (oldExchange == null) {
+                                    return newExchange;
+                                }
+
                                 String body = oldExchange.getIn().getBody(String.class);
                                 oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
                                 return oldExchange;

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MyAggregationStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MyAggregationStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MyAggregationStrategy.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MyAggregationStrategy.java Sat May 23 06:56:56 2009
@@ -24,22 +24,23 @@
  * @version $Revision$
 */
 public class MyAggregationStrategy extends UseLatestAggregationStrategy {
+
     @Override
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         Exchange result = super.aggregate(oldExchange, newExchange);
-        Integer old = (Integer) oldExchange.getProperty("aggregated");
-        if (old == null) {
-            old = 1;
+        if (oldExchange == null) {
+            result.setProperty("aggregated", 1);
+        } else {
+            Integer old = oldExchange.getProperty("aggregated", Integer.class);
+            result.setProperty("aggregated", old + 1);
         }
-        result.setProperty("aggregated", old + 1);
         return result;
     }
 
     /**
      * An expression used to determine if the aggregation is complete
      */
-    public boolean isCompleted(@Header("aggregated")
-                               Integer aggregated) {
+    public boolean isCompleted(@Header("aggregated") Integer aggregated) {
         if (aggregated == null) {
             return false;
         }

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitAggregateInOutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitAggregateInOutTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitAggregateInOutTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitAggregateInOutTest.java Sat May 23 06:56:56 2009
@@ -117,6 +117,12 @@
         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
             // put order together in old exchange by adding the order from new exchange
 
+            if (oldExchange == null) {
+                // the first time we aggregate we only have the new exchange,
+                // so we just return it
+                return newExchange;
+            }
+
             // copy from OUT as we use InOut pattern
             String orders = oldExchange.getOut().getBody(String.class);
             String newLine = newExchange.getOut().getBody(String.class);

Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java?rev=777808&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java Sat May 23 06:56:56 2009
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ */
+public class AggregatorConcurrencyTest extends ContextTestSupport {
+
+    private static final transient Log LOG = LogFactory.getLog(AggregatorConcurrencyTest.class);
+
+    private static final AtomicInteger COUNTER = new AtomicInteger(0);
+    private static final AtomicInteger SUM = new AtomicInteger(0);
+
+    private final int size = 100;
+    private final String uri = "direct:start";
+
+    public void testAggregateConcurrency() throws Exception {
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(1);
+
+        int total = 0;
+        ExecutorService service = Executors.newFixedThreadPool(20);
+        for (int i = 0; i < size; i++) {
+            final int count = i;
+            total += i;
+            service.submit(new Callable<Object>() {
+                public Object call() throws Exception {
+                    template.sendBodyAndHeader(uri, "Hello World", "index", count);
+                    return null;
+                }
+            });
+        }
+        mock.expectedBodiesReceived(total);
+        mock.expectedHeaderReceived("total", total);
+        mock.expectedPropertyReceived(Exchange.AGGREGATED_SIZE, size);
+
+        assertMockEndpointsSatisfied();
+
+        assertEquals(100, COUNTER.get());
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from(uri)
+                    .aggregate(constant(true), new AggregationStrategy() {
+                        public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+                            Exchange answer = oldExchange != null ? oldExchange : newExchange;
+                            COUNTER.getAndIncrement();
+
+                            Integer newIndex = newExchange.getIn().getHeader("index", Integer.class);
+                            int total = SUM.addAndGet(newIndex);
+                            answer.getIn().setHeader("total", total);
+
+                            LOG.debug("Index: " + newIndex + ". Total so far: " + total);
+                            return answer;
+                        }
+                    }).batchTimeout(60000).completionPredicate(property(Exchange.AGGREGATED_SIZE).isEqualTo(100))
+                    .to("direct:foo");
+
+                from("direct:foo").setBody().header("total").to("mock:result");
+            }
+        };
+    }
+}

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java Sat May 23 06:56:56 2009
@@ -23,7 +23,6 @@
 
 import org.apache.camel.ContextTestSupport;
 import org.apache.camel.Exchange;
-import org.apache.camel.Message;
 import org.apache.camel.Processor;
 import org.apache.camel.ProducerTemplate;
 import org.apache.camel.builder.RouteBuilder;
@@ -76,82 +75,62 @@
 
         return new RouteBuilder() {
             AggregationStrategy surnameAggregator = new AggregationStrategy() {
-                public Exchange aggregate(Exchange oldExchange,
-                        Exchange newExchange) {
-
+                public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                     debugIn("Surname Aggregator", oldExchange, newExchange);
 
-                    Message oldIn = oldExchange.getIn();
-                    Message newIn = newExchange.getIn();
-
-                    List<String> brothers = null;
-                    if (oldIn.getBody() instanceof List) {
+                    Exchange answer = newExchange;
 
-                        brothers = oldIn.getBody(List.class);
-                        brothers.add(newIn.getBody(String.class));
+                    if (oldExchange != null) {
+                        List<String> brothers = oldExchange.getIn().getBody(List.class);
+                        brothers.add(newExchange.getIn().getBody(String.class));
+                        answer = oldExchange;
                     } else {
+                        List<String>brothers = new ArrayList<String>();
+                        brothers.add(newExchange.getIn().getBody(String.class));
+                        newExchange.getIn().setBody(brothers);
+                    }
 
-                        brothers = new ArrayList<String>();
-                        brothers.add(oldIn.getBody(String.class));
-                        brothers.add(newIn.getBody(String.class));
-                        oldExchange.getIn().setBody(brothers);
-                    } // else
-
-                    debugOut("Surname Aggregator", oldExchange);
+                    debugOut("Surname Aggregator", answer);
 
-                    return oldExchange;
+                    return answer;
                 }
             };
-            AggregationStrategy brothersAggregator = new AggregationStrategy() {
-                public Exchange aggregate(Exchange oldExchange,
-                        Exchange newExchange) {
 
+            @SuppressWarnings("unchecked")
+            AggregationStrategy brothersAggregator = new AggregationStrategy() {
+                public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
                     debugIn("Brothers Aggregator", oldExchange, newExchange);
 
-                    Message oldIn = oldExchange.getIn();
-                    Message newIn = newExchange.getIn();
+                    Exchange answer = newExchange;
 
-                    Map<String, List> brothers = null;
-                    if (oldIn.getBody() instanceof Map) {
-
-                        brothers = oldIn.getBody(Map.class);
-                        brothers.put(newIn.getHeader(SURNAME_HEADER,
-                                String.class), newIn.getBody(List.class));
+                    if (oldExchange != null) {
+                        Map<String, List> brothers = oldExchange.getIn().getBody(Map.class);
+                        brothers.put(newExchange.getIn().getHeader(SURNAME_HEADER, String.class), newExchange.getIn().getBody(List.class));
+                        answer = oldExchange;
                     } else {
+                        Map<String, List> brothers = new HashMap<String, List>();
+                        brothers.put(newExchange.getIn().getHeader(SURNAME_HEADER, String.class), newExchange.getIn().getBody(List.class));
+                        newExchange.getIn().setBody(brothers);
+                    }
 
-                        brothers = new HashMap<String, List>();
-                        brothers.put(oldIn.getHeader(SURNAME_HEADER, String.class),
-                                oldIn.getBody(List.class));
-                        brothers.put(newIn.getHeader(SURNAME_HEADER,
-                                String.class), newIn.getBody(List.class));
-                        oldExchange.getIn().setBody(brothers);
-                    } // else
-
-                    debugOut("Brothers Aggregator", oldExchange);
+                    debugOut("Brothers Aggregator", answer);
 
-                    return oldExchange;
+                    return answer;
                 }
             };
 
-            private void debugIn(String stringId, Exchange oldExchange,
-                    Exchange newExchange) {
-
-                log.debug(stringId + " old headers in: "
-                        + oldExchange.getIn().getHeaders());
-                log.debug(stringId + " old body in: "
-                        + oldExchange.getIn().getBody());
-                log.debug(stringId + " new headers in: "
-                        + newExchange.getIn().getHeaders());
-                log.debug(stringId + " new body in: "
-                        + newExchange.getIn().getBody());
+            private void debugIn(String stringId, Exchange oldExchange, Exchange newExchange) {
+                if (oldExchange != null) {
+                    log.debug(stringId + " old headers in: " + oldExchange.getIn().getHeaders());
+                    log.debug(stringId + " old body in: " + oldExchange.getIn().getBody());
+                }
+                log.debug(stringId + " new headers in: " + newExchange.getIn().getHeaders());
+                log.debug(stringId + " new body in: " + newExchange.getIn().getBody());
             }
 
             private void debugOut(String stringId, Exchange exchange) {
-
-                log.debug(stringId + " old headers out: "
-                        + exchange.getIn().getHeaders());
-                log.debug(stringId + " old body out: "
-                        + exchange.getIn().getBody());
+                log.debug(stringId + " old headers out: " + exchange.getIn().getHeaders());
+                log.debug(stringId + " old body out: " + exchange.getIn().getBody());
             }
 
             @Override

Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java Sat May 23 06:56:56 2009
@@ -67,6 +67,10 @@
     private static class MyAggregationStrategy implements AggregationStrategy {
 
         public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+            if (oldExchange == null) {
+                // the first time we only have the new exchange so it wins the first round
+                return newExchange;
+            }
             int oldPrice = oldExchange.getIn().getBody(Integer.class);
             int newPrice = newExchange.getIn().getBody(Integer.class);
             // return the "winner" that has the highest price

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java Sat May 23 06:56:56 2009
@@ -37,12 +37,6 @@
         this.binding = binding;
     }
 
-    public JmsExchange(CamelContext context, ExchangePattern pattern, JmsBinding binding, Message message) {
-        super(context, pattern);
-        this.binding = binding;
-        setIn(new JmsMessage(message));
-    }
-
     public JmsExchange(JmsEndpoint endpoint, ExchangePattern pattern, JmsBinding binding, Message message) {
         this(endpoint, pattern, binding);
         setIn(new JmsMessage(message));

Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java Sat May 23 06:56:56 2009
@@ -65,13 +65,10 @@
         resultEndpoint.assertIsSatisfied(8000);
     }
 
-
-
     protected void sendExchange(String uri, final Object expectedBody) {
         template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
     }
 
-
     protected CamelContext createCamelContext() throws Exception {
         CamelContext camelContext = super.createCamelContext();
 
@@ -103,15 +100,17 @@
                 from("jms:queue:point3").process(new MyProcessor()).to("jms:queue:reply");
                 from("jms:queue:reply").aggregate(header("cheese"), new AggregationStrategy() {
                     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
-                        Exchange copy = newExchange.copy();
+                        if (oldExchange == null) {
+                            return newExchange;
+                        }
+
                         LOG.info("try to aggregating the message ");
-                        Integer old = (Integer) oldExchange.getProperty("aggregated");
+                        Integer old = oldExchange.getProperty("aggregated", Integer.class);
                         if (old == null) {
                             old = 1;
                         }
-                        Exchange result = copy;
-                        result.setProperty("aggregated", old + 1);
-                        return result;
+                        oldExchange.setProperty("aggregated", old + 1);
+                        return oldExchange;
                     }
                 }).completionPredicate(header("aggregated").isEqualTo(3))
                 .to("mock:reply");

Modified: camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/AggregateRssFeedStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/AggregateRssFeedStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/AggregateRssFeedStrategy.java (original)
+++ camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/AggregateRssFeedStrategy.java Sat May 23 06:56:56 2009
@@ -30,6 +30,9 @@
     protected final transient Log log = LogFactory.getLog(AggregateRssFeedStrategy.class);    
     
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        if (oldExchange == null) {
+            return newExchange;
+        }
         SyndFeed oldFeed = oldExchange.getIn().getBody(SyndFeed.class);
         SyndFeed newFeed = newExchange.getIn().getBody(SyndFeed.class);
         if (oldFeed != null && newFeed != null) {                

Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/MyAggregator.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/MyAggregator.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/MyAggregator.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/MyAggregator.java Sat May 23 06:56:56 2009
@@ -32,6 +32,10 @@
 
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         // lets append the old body to the new body
+        if (oldExchange == null) {
+            return newExchange;
+        }
+
         String body = oldExchange.getIn().getBody(String.class);
         if (body != null) {
             Message newIn = newExchange.getIn();

Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringMulticastAggregatorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringMulticastAggregatorTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringMulticastAggregatorTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringMulticastAggregatorTest.java Sat May 23 06:56:56 2009
@@ -17,12 +17,10 @@
 package org.apache.camel.spring.processor;
 
 import org.apache.camel.CamelContext;
-import org.apache.camel.processor.MultiCastAggregatorTest;
-
-
+import org.apache.camel.processor.MulticastAnotherAggregatorTest;
 import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
 
-public class SpringMulticastAggregatorTest extends MultiCastAggregatorTest {
+public class SpringMulticastAggregatorTest extends MulticastAnotherAggregatorTest {
 
     protected CamelContext createCamelContext() throws Exception {
         return createSpringCamelContext(this, "org/apache/camel/spring/processor/multicastAggregator.xml");

Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/scattergather/LowestQuoteAggregationStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/scattergather/LowestQuoteAggregationStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/scattergather/LowestQuoteAggregationStrategy.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/scattergather/LowestQuoteAggregationStrategy.java Sat May 23 06:56:56 2009
@@ -22,6 +22,11 @@
 // START SNIPPET: e1
 public class LowestQuoteAggregationStrategy implements AggregationStrategy {
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        // the first time we only have the new exchange
+        if (oldExchange == null) {
+            return newExchange;
+        }
+
         if (oldExchange.getIn().getBody(int.class) < newExchange.getIn().getBody(int.class)) {
             return oldExchange;
         } else {

Modified: camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java (original)
+++ camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java Sat May 23 06:56:56 2009
@@ -36,6 +36,11 @@
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
         LOG.debug("Get the exchange to aggregate, older: " + oldExchange + " newer:" + newExchange);
 
+        // the first time we only have the new exchange
+        if (oldExchange == null) {
+            return newExchange;
+        }
+
         Message oldMessage;
         Message newMessage;
         if (aggregatingOutMessage) {

Modified: camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/BankResponseAggregationStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/BankResponseAggregationStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/BankResponseAggregationStrategy.java (original)
+++ camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/BankResponseAggregationStrategy.java Sat May 23 06:56:56 2009
@@ -26,6 +26,11 @@
     public static final String BANK_QUOTE = "bank_quote";
 
     public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+        // the first time we only have the new exchange
+        if (oldExchange == null) {
+            return newExchange;
+        }
+
         // Get the bank quote instance from the exchange
         BankQuote oldQuote = oldExchange.getProperty(BANK_QUOTE, BankQuote.class);
         // Get the oldQute from out message body if we can't get it from the exchange
@@ -50,7 +55,6 @@
         result.getOut().setBody("The best rate is " + bankQuote.toString());
 
         return result;
-
     }
 
 }