You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/23 08:56:58 UTC
svn commit: r777808 [2/2] - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
camel-core/src/main/java/org/apache/camel/component/file/
camel-core/src/main/java/org/apache/camel/component/seda/
camel-core/src/main/java/org/apache/camel/impl/ ...
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/component/file/FileConsumerBatchTest.java Sat May 23 06:56:56 2009
@@ -48,9 +48,9 @@
mock.expectedBodiesReceivedInAnyOrder("Hello World", "Bye World");
// test header keys
- mock.message(0).header(Exchange.FILE_BATCH_SIZE).isEqualTo(2);
- mock.message(0).header(Exchange.FILE_BATCH_INDEX).isEqualTo(0);
- mock.message(1).header(Exchange.FILE_BATCH_INDEX).isEqualTo(1);
+ mock.message(0).header(Exchange.BATCH_SIZE).isEqualTo(2);
+ mock.message(0).header(Exchange.BATCH_INDEX).isEqualTo(0);
+ mock.message(1).header(Exchange.BATCH_INDEX).isEqualTo(1);
assertMockEndpointsSatisfied();
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyInAggregatingStrategy.java Sat May 23 06:56:56 2009
@@ -18,18 +18,17 @@
import org.apache.camel.Exchange;
import org.apache.camel.Header;
-import org.apache.camel.Message;
import org.apache.camel.processor.aggregate.AggregationStrategy;
public class BodyInAggregatingStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
- Exchange copy = newExchange.copy();
- Message newIn = copy.getIn();
- String oldBody = oldExchange.getIn().getBody(String.class);
- String newBody = newIn.getBody(String.class);
- newIn.setBody(oldBody + "+" + newBody);
- return copy;
+ if (oldExchange != null) {
+ String oldBody = oldExchange.getIn().getBody(String.class);
+ String newBody = newExchange.getIn().getBody(String.class);
+ newExchange.getIn().setBody(oldBody + "+" + newBody);
+ }
+ return newExchange;
}
/**
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyOutAggregatingStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyOutAggregatingStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyOutAggregatingStrategy.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/BodyOutAggregatingStrategy.java Sat May 23 06:56:56 2009
@@ -23,10 +23,12 @@
public class BodyOutAggregatingStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
- Message newOut = newExchange.getOut();
- String oldBody = oldExchange.getOut().getBody(String.class);
- String newBody = newOut.getBody(String.class);
- newOut.setBody(oldBody + "+" + newBody);
+ if (oldExchange != null) {
+ String oldBody = oldExchange.getOut().getBody(String.class);
+ String newBody = newExchange.getOut().getBody(String.class);
+ newExchange.getOut().setBody(oldBody + "+" + newBody);
+ }
+
return newExchange;
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/ComposedMessageProcessorTest.java Sat May 23 06:56:56 2009
@@ -152,10 +152,16 @@
*/
// START SNIPPET: e7
public static final class MyOrderAggregationStrategy implements AggregationStrategy {
+
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
List<OrderItem> order = new ArrayList<OrderItem>(2);
order.add(oldExchange.getIn().getBody(OrderItem.class));
order.add(newExchange.getIn().getBody(OrderItem.class));
+
oldExchange.getIn().setBody(order);
return oldExchange;
}
Copied: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerAsyncTest.java (from r775966, camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerAsyncTest.java?p2=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerAsyncTest.java&p1=camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java&r1=775966&r2=777808&rev=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/IdempotentConsumerAsyncTest.java Sat May 23 06:56:56 2009
@@ -28,7 +28,7 @@
/**
* @version $Revision$
*/
-public class IdempotentConsumerTest extends ContextTestSupport {
+public class IdempotentConsumerAsyncTest extends ContextTestSupport {
protected Endpoint startEndpoint;
protected MockEndpoint resultEndpoint;
@@ -43,7 +43,7 @@
public void configure() throws Exception {
from("direct:start").idempotentConsumer(
header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
- ).to("mock:result");
+ ).async().to("mock:result");
}
});
context.start();
@@ -68,7 +68,7 @@
from("direct:start").idempotentConsumer(
header("messageId"), MemoryIdempotentRepository.memoryIdempotentRepository(200)
- ).process(new Processor() {
+ ).async().process(new Processor() {
public void process(Exchange exchange) throws Exception {
String id = exchange.getIn().getHeader("messageId", String.class);
if (id.equals("2")) {
@@ -94,7 +94,7 @@
assertMockEndpointsSatisfied();
}
- protected void sendMessage(final Object messageId, final Object body) {
+ protected void sendMessage(final Object messageId, final Object body) throws Exception {
template.send(startEndpoint, new Processor() {
public void process(Exchange exchange) {
// now lets fire in a message
@@ -103,6 +103,9 @@
in.setHeader("messageId", messageId);
}
});
+
+ // must sleep a little as the route is async and we can be to fast
+ Thread.sleep(50);
}
@Override
@@ -113,4 +116,4 @@
resultEndpoint = getMockEndpoint("mock:result");
}
-}
+}
\ No newline at end of file
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java?rev=777808&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java Sat May 23 06:56:56 2009
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Message;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+
+public class MulticastAnotherAggregatorTest extends ContextTestSupport {
+
+ public void testMulticastReceivesItsOwnExchangeParallelly() throws Exception {
+ sendingAMessageUsingMulticastReceivesItsOwnExchange(true);
+ }
+
+ public void testMulticastReceivesItsOwnExchangeSequentially() throws Exception {
+ sendingAMessageUsingMulticastReceivesItsOwnExchange(false);
+ }
+
+ private void sendingAMessageUsingMulticastReceivesItsOwnExchange(boolean isParallel) throws Exception {
+ MockEndpoint result = getMockEndpoint("mock:result");
+ result.expectedBodiesReceived("inputx+inputy+inputz");
+
+ String url;
+ if (isParallel) {
+ url = "direct:parallel";
+ } else {
+ url = "direct:sequential";
+ }
+
+ // use InOut
+ Exchange exchange = template.request(url, new Processor() {
+ public void process(Exchange exchange) {
+ Message in = exchange.getIn();
+ in.setBody("input");
+ in.setHeader("foo", "bar");
+ }
+ });
+
+ assertNotNull("We should get result here", exchange);
+ assertEquals("Can't get the right result", "inputx+inputy+inputz", exchange.getOut().getBody(String.class));
+
+ assertMockEndpointsSatisfied();
+ }
+
+ protected RouteBuilder createRouteBuilder() {
+
+ return new RouteBuilder() {
+ public void configure() {
+ ThreadPoolExecutor tpExecutor = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(10));
+ // START SNIPPET: example
+ // The message will be sent parallelly to the endpoints
+ from("direct:parallel")
+ .multicast(new BodyOutAggregatingStrategy(), true).executorService(tpExecutor)
+ .to("direct:x", "direct:y", "direct:z");
+ // Multicast the message in a sequential way
+ from("direct:sequential").multicast(new BodyOutAggregatingStrategy()).to("direct:x", "direct:y", "direct:z");
+
+ from("direct:x").process(new AppendingProcessor("x")).to("direct:aggregator");
+ from("direct:y").process(new AppendingProcessor("y")).to("direct:aggregator");
+ from("direct:z").process(new AppendingProcessor("z")).to("direct:aggregator");
+
+ from("direct:aggregator").aggregate(header("cheese"), new BodyInAggregatingStrategy()).
+ completionPredicate(header(Exchange.AGGREGATED_SIZE).isEqualTo(3)).to("mock:result");
+ // END SNIPPET: example
+ }
+ };
+
+ }
+
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastAnotherAggregatorTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStreamingTest.java Sat May 23 06:56:56 2009
@@ -18,6 +18,7 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -36,6 +37,23 @@
assertMockEndpointsSatisfied();
}
+ public void testMulticastParallel() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(20);
+ mock.whenAnyExchangeReceived(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ // they should all be BA as B is faster than A
+ assertEquals("BA", exchange.getIn().getBody(String.class));
+ }
+ });
+
+ for (int i = 0; i < 20; i++) {
+ template.sendBody("direct:start", "Hello");
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@@ -44,6 +62,10 @@
from("direct:start")
.multicast(new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
String body = oldExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
return oldExchange;
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStressTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStressTest.java?rev=777808&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStressTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStressTest.java Sat May 23 06:56:56 2009
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+
+/**
+ * @version $Revision$
+ */
+public class MulticastParallelStressTest extends ContextTestSupport {
+
+ public void testTwoMulticast() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedBodiesReceived("ABCD", "ABCD");
+ mock.expectsAscending().header("id");
+
+ template.sendBodyAndHeader("direct:start", "", "id", 1);
+ template.sendBodyAndHeader("direct:start", "", "id", 2);
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testMoreMulticast() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(20);
+ mock.expectsAscending().header("id");
+
+ for (int i = 0; i < 20; i++) {
+ template.sendBodyAndHeader("direct:start", "", "id", i);
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+ public void testConcurrencyParallelMulticast() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(20);
+ // this time we cannot expect in order but there should be no duplicates
+ mock.expectsNoDuplicates(header("id"));
+
+ ExecutorService executor = Executors.newFixedThreadPool(10);
+ for (int i = 0; i < 20; i++) {
+ final int index = i;
+ executor.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ template.sendBodyAndHeader("direct:start", "", "id", index);
+ return null;
+ }
+ });
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:start")
+ .multicast(new AggregationStrategy() {
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
+ String body = oldExchange.getIn().getBody(String.class);
+ oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
+ return oldExchange;
+ }
+ }).parallelProcessing()
+ .to("direct:a", "direct:b", "direct:c", "direct:d")
+ // use end to indicate end of multicast route
+ .end()
+ .to("mock:result");
+
+ from("direct:a").delay(20).setBody(body().append("A"));
+
+ from("direct:b").setBody(body().append("B"));
+
+ from("direct:c").delay(50).setBody(body().append("C"));
+
+ from("direct:d").delay(10).setBody(body().append("D"));
+ }
+ };
+ }
+
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStressTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelStressTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MulticastParallelTest.java Sat May 23 06:56:56 2009
@@ -18,6 +18,7 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.processor.aggregate.AggregationStrategy;
@@ -27,7 +28,7 @@
*/
public class MulticastParallelTest extends ContextTestSupport {
- public void testMulticastParallel() throws Exception {
+ public void testSingleMulticastParallel() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("AB");
@@ -36,6 +37,23 @@
assertMockEndpointsSatisfied();
}
+ public void testMulticastParallel() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(20);
+ mock.whenAnyExchangeReceived(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ // they should all be AB even though A is slower than B
+ assertEquals("AB", exchange.getIn().getBody(String.class));
+ }
+ });
+
+ for (int i = 0; i < 20; i++) {
+ template.sendBody("direct:start", "Hello");
+ }
+
+ assertMockEndpointsSatisfied();
+ }
+
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@@ -44,6 +62,10 @@
from("direct:start")
.multicast(new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
String body = oldExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(body + newExchange.getIn().getBody(String.class));
return oldExchange;
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MyAggregationStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MyAggregationStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MyAggregationStrategy.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/MyAggregationStrategy.java Sat May 23 06:56:56 2009
@@ -24,22 +24,23 @@
* @version $Revision$
*/
public class MyAggregationStrategy extends UseLatestAggregationStrategy {
+
@Override
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
Exchange result = super.aggregate(oldExchange, newExchange);
- Integer old = (Integer) oldExchange.getProperty("aggregated");
- if (old == null) {
- old = 1;
+ if (oldExchange == null) {
+ result.setProperty("aggregated", 1);
+ } else {
+ Integer old = oldExchange.getProperty("aggregated", Integer.class);
+ result.setProperty("aggregated", old + 1);
}
- result.setProperty("aggregated", old + 1);
return result;
}
/**
* An expression used to determine if the aggregation is complete
*/
- public boolean isCompleted(@Header("aggregated")
- Integer aggregated) {
+ public boolean isCompleted(@Header("aggregated") Integer aggregated) {
if (aggregated == null) {
return false;
}
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitAggregateInOutTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitAggregateInOutTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitAggregateInOutTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/SplitAggregateInOutTest.java Sat May 23 06:56:56 2009
@@ -117,6 +117,12 @@
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
// put order together in old exchange by adding the order from new exchange
+ if (oldExchange == null) {
+ // the first time we aggregate we only have the new exchange,
+ // so we just return it
+ return newExchange;
+ }
+
// copy from OUT as we use InOut pattern
String orders = oldExchange.getOut().getBody(String.class);
String newLine = newExchange.getOut().getBody(String.class);
Added: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java?rev=777808&view=auto
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java (added)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java Sat May 23 06:56:56 2009
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor.aggregator;
+
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.processor.aggregate.AggregationStrategy;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * @version $Revision$
+ */
+public class AggregatorConcurrencyTest extends ContextTestSupport {
+
+ private static final transient Log LOG = LogFactory.getLog(AggregatorConcurrencyTest.class);
+
+ private static final AtomicInteger COUNTER = new AtomicInteger(0);
+ private static final AtomicInteger SUM = new AtomicInteger(0);
+
+ private final int size = 100;
+ private final String uri = "direct:start";
+
+ public void testAggregateConcurrency() throws Exception {
+ MockEndpoint mock = getMockEndpoint("mock:result");
+ mock.expectedMessageCount(1);
+
+ int total = 0;
+ ExecutorService service = Executors.newFixedThreadPool(20);
+ for (int i = 0; i < size; i++) {
+ final int count = i;
+ total += i;
+ service.submit(new Callable<Object>() {
+ public Object call() throws Exception {
+ template.sendBodyAndHeader(uri, "Hello World", "index", count);
+ return null;
+ }
+ });
+ }
+ mock.expectedBodiesReceived(total);
+ mock.expectedHeaderReceived("total", total);
+ mock.expectedPropertyReceived(Exchange.AGGREGATED_SIZE, size);
+
+ assertMockEndpointsSatisfied();
+
+ assertEquals(100, COUNTER.get());
+ }
+
+ @Override
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ public void configure() throws Exception {
+ from(uri)
+ .aggregate(constant(true), new AggregationStrategy() {
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ Exchange answer = oldExchange != null ? oldExchange : newExchange;
+ COUNTER.getAndIncrement();
+
+ Integer newIndex = newExchange.getIn().getHeader("index", Integer.class);
+ int total = SUM.addAndGet(newIndex);
+ answer.getIn().setHeader("total", total);
+
+ LOG.debug("Index: " + newIndex + ". Total so far: " + total);
+ return answer;
+ }
+ }).batchTimeout(60000).completionPredicate(property(Exchange.AGGREGATED_SIZE).isEqualTo(100))
+ .to("direct:foo");
+
+ from("direct:foo").setBody().header("total").to("mock:result");
+ }
+ };
+ }
+}
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AggregatorConcurrencyTest.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/AlbertoAggregatorTest.java Sat May 23 06:56:56 2009
@@ -23,7 +23,6 @@
import org.apache.camel.ContextTestSupport;
import org.apache.camel.Exchange;
-import org.apache.camel.Message;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
@@ -76,82 +75,62 @@
return new RouteBuilder() {
AggregationStrategy surnameAggregator = new AggregationStrategy() {
- public Exchange aggregate(Exchange oldExchange,
- Exchange newExchange) {
-
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
debugIn("Surname Aggregator", oldExchange, newExchange);
- Message oldIn = oldExchange.getIn();
- Message newIn = newExchange.getIn();
-
- List<String> brothers = null;
- if (oldIn.getBody() instanceof List) {
+ Exchange answer = newExchange;
- brothers = oldIn.getBody(List.class);
- brothers.add(newIn.getBody(String.class));
+ if (oldExchange != null) {
+ List<String> brothers = oldExchange.getIn().getBody(List.class);
+ brothers.add(newExchange.getIn().getBody(String.class));
+ answer = oldExchange;
} else {
+ List<String>brothers = new ArrayList<String>();
+ brothers.add(newExchange.getIn().getBody(String.class));
+ newExchange.getIn().setBody(brothers);
+ }
- brothers = new ArrayList<String>();
- brothers.add(oldIn.getBody(String.class));
- brothers.add(newIn.getBody(String.class));
- oldExchange.getIn().setBody(brothers);
- } // else
-
- debugOut("Surname Aggregator", oldExchange);
+ debugOut("Surname Aggregator", answer);
- return oldExchange;
+ return answer;
}
};
- AggregationStrategy brothersAggregator = new AggregationStrategy() {
- public Exchange aggregate(Exchange oldExchange,
- Exchange newExchange) {
+ @SuppressWarnings("unchecked")
+ AggregationStrategy brothersAggregator = new AggregationStrategy() {
+ public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
debugIn("Brothers Aggregator", oldExchange, newExchange);
- Message oldIn = oldExchange.getIn();
- Message newIn = newExchange.getIn();
+ Exchange answer = newExchange;
- Map<String, List> brothers = null;
- if (oldIn.getBody() instanceof Map) {
-
- brothers = oldIn.getBody(Map.class);
- brothers.put(newIn.getHeader(SURNAME_HEADER,
- String.class), newIn.getBody(List.class));
+ if (oldExchange != null) {
+ Map<String, List> brothers = oldExchange.getIn().getBody(Map.class);
+ brothers.put(newExchange.getIn().getHeader(SURNAME_HEADER, String.class), newExchange.getIn().getBody(List.class));
+ answer = oldExchange;
} else {
+ Map<String, List> brothers = new HashMap<String, List>();
+ brothers.put(newExchange.getIn().getHeader(SURNAME_HEADER, String.class), newExchange.getIn().getBody(List.class));
+ newExchange.getIn().setBody(brothers);
+ }
- brothers = new HashMap<String, List>();
- brothers.put(oldIn.getHeader(SURNAME_HEADER, String.class),
- oldIn.getBody(List.class));
- brothers.put(newIn.getHeader(SURNAME_HEADER,
- String.class), newIn.getBody(List.class));
- oldExchange.getIn().setBody(brothers);
- } // else
-
- debugOut("Brothers Aggregator", oldExchange);
+ debugOut("Brothers Aggregator", answer);
- return oldExchange;
+ return answer;
}
};
- private void debugIn(String stringId, Exchange oldExchange,
- Exchange newExchange) {
-
- log.debug(stringId + " old headers in: "
- + oldExchange.getIn().getHeaders());
- log.debug(stringId + " old body in: "
- + oldExchange.getIn().getBody());
- log.debug(stringId + " new headers in: "
- + newExchange.getIn().getHeaders());
- log.debug(stringId + " new body in: "
- + newExchange.getIn().getBody());
+ private void debugIn(String stringId, Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange != null) {
+ log.debug(stringId + " old headers in: " + oldExchange.getIn().getHeaders());
+ log.debug(stringId + " old body in: " + oldExchange.getIn().getBody());
+ }
+ log.debug(stringId + " new headers in: " + newExchange.getIn().getHeaders());
+ log.debug(stringId + " new body in: " + newExchange.getIn().getBody());
}
private void debugOut(String stringId, Exchange exchange) {
-
- log.debug(stringId + " old headers out: "
- + exchange.getIn().getHeaders());
- log.debug(stringId + " old body out: "
- + exchange.getIn().getBody());
+ log.debug(stringId + " old headers out: " + exchange.getIn().getHeaders());
+ log.debug(stringId + " old body out: " + exchange.getIn().getBody());
}
@Override
Modified: camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java (original)
+++ camel/trunk/camel-core/src/test/java/org/apache/camel/processor/aggregator/CustomAggregationStrategyTest.java Sat May 23 06:56:56 2009
@@ -67,6 +67,10 @@
private static class MyAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ // the first time we only have the new exchange so it wins the first round
+ return newExchange;
+ }
int oldPrice = oldExchange.getIn().getBody(Integer.class);
int newPrice = newExchange.getIn().getBody(Integer.class);
// return the "winner" that has the highest price
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsExchange.java Sat May 23 06:56:56 2009
@@ -37,12 +37,6 @@
this.binding = binding;
}
- public JmsExchange(CamelContext context, ExchangePattern pattern, JmsBinding binding, Message message) {
- super(context, pattern);
- this.binding = binding;
- setIn(new JmsMessage(message));
- }
-
public JmsExchange(JmsEndpoint endpoint, ExchangePattern pattern, JmsBinding binding, Message message) {
this(endpoint, pattern, binding);
setIn(new JmsMessage(message));
Modified: camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java (original)
+++ camel/trunk/components/camel-jms/src/test/java/org/apache/camel/component/jms/AggregratedJmsRouteTest.java Sat May 23 06:56:56 2009
@@ -65,13 +65,10 @@
resultEndpoint.assertIsSatisfied(8000);
}
-
-
protected void sendExchange(String uri, final Object expectedBody) {
template.sendBodyAndHeader(uri, expectedBody, "cheese", 123);
}
-
protected CamelContext createCamelContext() throws Exception {
CamelContext camelContext = super.createCamelContext();
@@ -103,15 +100,17 @@
from("jms:queue:point3").process(new MyProcessor()).to("jms:queue:reply");
from("jms:queue:reply").aggregate(header("cheese"), new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
- Exchange copy = newExchange.copy();
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
LOG.info("try to aggregating the message ");
- Integer old = (Integer) oldExchange.getProperty("aggregated");
+ Integer old = oldExchange.getProperty("aggregated", Integer.class);
if (old == null) {
old = 1;
}
- Exchange result = copy;
- result.setProperty("aggregated", old + 1);
- return result;
+ oldExchange.setProperty("aggregated", old + 1);
+ return oldExchange;
}
}).completionPredicate(header("aggregated").isEqualTo(3))
.to("mock:reply");
Modified: camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/AggregateRssFeedStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/AggregateRssFeedStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/AggregateRssFeedStrategy.java (original)
+++ camel/trunk/components/camel-rss/src/main/java/org/apache/camel/component/rss/AggregateRssFeedStrategy.java Sat May 23 06:56:56 2009
@@ -30,6 +30,9 @@
protected final transient Log log = LogFactory.getLog(AggregateRssFeedStrategy.class);
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ if (oldExchange == null) {
+ return newExchange;
+ }
SyndFeed oldFeed = oldExchange.getIn().getBody(SyndFeed.class);
SyndFeed newFeed = newExchange.getIn().getBody(SyndFeed.class);
if (oldFeed != null && newFeed != null) {
Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/MyAggregator.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/MyAggregator.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/MyAggregator.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/MyAggregator.java Sat May 23 06:56:56 2009
@@ -32,6 +32,10 @@
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
// lets append the old body to the new body
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
String body = oldExchange.getIn().getBody(String.class);
if (body != null) {
Message newIn = newExchange.getIn();
Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringMulticastAggregatorTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringMulticastAggregatorTest.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringMulticastAggregatorTest.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/SpringMulticastAggregatorTest.java Sat May 23 06:56:56 2009
@@ -17,12 +17,10 @@
package org.apache.camel.spring.processor;
import org.apache.camel.CamelContext;
-import org.apache.camel.processor.MultiCastAggregatorTest;
-
-
+import org.apache.camel.processor.MulticastAnotherAggregatorTest;
import static org.apache.camel.spring.processor.SpringTestHelper.createSpringCamelContext;
-public class SpringMulticastAggregatorTest extends MultiCastAggregatorTest {
+public class SpringMulticastAggregatorTest extends MulticastAnotherAggregatorTest {
protected CamelContext createCamelContext() throws Exception {
return createSpringCamelContext(this, "org/apache/camel/spring/processor/multicastAggregator.xml");
Modified: camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/scattergather/LowestQuoteAggregationStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/scattergather/LowestQuoteAggregationStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/scattergather/LowestQuoteAggregationStrategy.java (original)
+++ camel/trunk/components/camel-spring/src/test/java/org/apache/camel/spring/processor/scattergather/LowestQuoteAggregationStrategy.java Sat May 23 06:56:56 2009
@@ -22,6 +22,11 @@
// START SNIPPET: e1
public class LowestQuoteAggregationStrategy implements AggregationStrategy {
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ // the first time we only have the new exchange
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
if (oldExchange.getIn().getBody(int.class) < newExchange.getIn().getBody(int.class)) {
return oldExchange;
} else {
Modified: camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java (original)
+++ camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/queue/version/BankResponseAggregationStrategy.java Sat May 23 06:56:56 2009
@@ -36,6 +36,11 @@
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
LOG.debug("Get the exchange to aggregate, older: " + oldExchange + " newer:" + newExchange);
+ // the first time we only have the new exchange
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
Message oldMessage;
Message newMessage;
if (aggregatingOutMessage) {
Modified: camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/BankResponseAggregationStrategy.java
URL: http://svn.apache.org/viewvc/camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/BankResponseAggregationStrategy.java?rev=777808&r1=777807&r2=777808&view=diff
==============================================================================
--- camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/BankResponseAggregationStrategy.java (original)
+++ camel/trunk/examples/camel-example-loan-broker/src/main/java/org/apache/camel/loanbroker/webservice/version/BankResponseAggregationStrategy.java Sat May 23 06:56:56 2009
@@ -26,6 +26,11 @@
public static final String BANK_QUOTE = "bank_quote";
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
+ // the first time we only have the new exchange
+ if (oldExchange == null) {
+ return newExchange;
+ }
+
// Get the bank quote instance from the exchange
BankQuote oldQuote = oldExchange.getProperty(BANK_QUOTE, BankQuote.class);
// Get the oldQute from out message body if we can't get it from the exchange
@@ -50,7 +55,6 @@
result.getOut().setBody("The best rate is " + bankQuote.toString());
return result;
-
}
}