You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by honno <ho...@163.com> on 2009/12/24 08:44:10 UTC
Multicasting jms message with AggregationStrategy occur errors
When i send messages to jms endpoint with multicast ,i get error results.
This is my junit test.
public class MulticastStrategyRefClientTemplateDSL extends ContextTest {
public void testSingleMulticastParallel() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived("AB");
template.sendBody("jms:start", "Hello");
assertMockEndpointsSatisfied();
}
public void atestMulticastParallel() throws Exception {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedMessageCount(20);
mock.whenAnyExchangeReceived(new Processor() {
public void process(Exchange exchange) throws Exception {
// they should all be AB even though A is slower than B
assertEquals("AB", exchange.getIn().getBody(String.class));
}
});
for (int i = 0; i < 20; i++) {
template.sendBody("direct:start", "Hello");
}
assertMockEndpointsSatisfied();
}
@Override
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
from("jms:start").multicast(new AggregationStrategy() {
public Exchange aggregate(Exchange oldExchange,
Exchange newExchange) {
if (oldExchange == null) {
return newExchange;
}
String body = oldExchange.getIn().getBody(String.class);
oldExchange.getIn().setBody(
body
+ newExchange.getIn().getBody(
String.class));
return oldExchange;
}
}).to("jms:a", "jms:b")
// use end to indicate end of multicast route
.end().to("mock:result");
from("jms:a").delay(100).setBody(constant("A"));
from("jms:b").setBody(constant("B"));
}
};
}
}
public abstract class ContextTest extends ContextTestSupport {
@Override
protected void setUp() throws Exception {
context = createCamelContext();
assertValidContext(context);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
"vm://localhost?broker.persistent=false");
context.addComponent("jms",
jmsComponentClientAcknowledge(connectionFactory));
template = context.createProducerTemplate();
consumer = context.createConsumerTemplate();
if (isUseRouteBuilder()) {
RouteBuilder[] builders = createRouteBuilders();
for (RouteBuilder builder : builders) {
log.debug("Using created route builder: " + builder);
context.addRoutes(builder);
}
startCamelContext();
} else {
log.debug("isUseRouteBuilder() is false");
}
}
}
The junit failure trace is :
java.lang.AssertionError: mock://result Body of message: 0. Expected: <AB>
but was: <HelloHello>
at org.apache.camel.component.mock.MockEndpoint.fail(MockEndpoint.java:899)
at
org.apache.camel.component.mock.MockEndpoint.assertEquals(MockEndpoint.java:881)
but if i replace "jms" endpoint with "direct" endpoint .the junit case is
ok.
what is the problem ?
--
View this message in context: http://old.nabble.com/Multicasting-jms-message-with-AggregationStrategy-occur-errors-tp26911286p26911286.html
Sent from the Camel - Users mailing list archive at Nabble.com.
Re: Multicasting jms message with AggregationStrategy occur errors
Posted by Claus Ibsen <cl...@gmail.com>.
You need to use requestBody as you want InOut MEP.
sendBody is InOnly.
Or change the MEP in the route etc. See this EIP pattern
http://camel.apache.org/request-reply.html
On Thu, Dec 24, 2009 at 8:44 AM, honno <ho...@163.com> wrote:
>
> When i send messages to jms endpoint with multicast ,i get error results.
> This is my junit test.
> public class MulticastStrategyRefClientTemplateDSL extends ContextTest {
>
> public void testSingleMulticastParallel() throws Exception {
> MockEndpoint mock = getMockEndpoint("mock:result");
> mock.expectedBodiesReceived("AB");
>
> template.sendBody("jms:start", "Hello");
>
> assertMockEndpointsSatisfied();
> }
>
> public void atestMulticastParallel() throws Exception {
> MockEndpoint mock = getMockEndpoint("mock:result");
> mock.expectedMessageCount(20);
> mock.whenAnyExchangeReceived(new Processor() {
> public void process(Exchange exchange) throws Exception {
> // they should all be AB even though A is slower than B
> assertEquals("AB", exchange.getIn().getBody(String.class));
> }
> });
>
> for (int i = 0; i < 20; i++) {
> template.sendBody("direct:start", "Hello");
> }
>
> assertMockEndpointsSatisfied();
> }
>
> @Override
> protected RouteBuilder createRouteBuilder() throws Exception {
> return new RouteBuilder() {
> @Override
> public void configure() throws Exception {
> from("jms:start").multicast(new AggregationStrategy() {
> public Exchange aggregate(Exchange oldExchange,
> Exchange newExchange) {
> if (oldExchange == null) {
> return newExchange;
> }
>
> String body = oldExchange.getIn().getBody(String.class);
> oldExchange.getIn().setBody(
> body
> + newExchange.getIn().getBody(
> String.class));
> return oldExchange;
> }
> }).to("jms:a", "jms:b")
> // use end to indicate end of multicast route
> .end().to("mock:result");
>
> from("jms:a").delay(100).setBody(constant("A"));
>
> from("jms:b").setBody(constant("B"));
> }
> };
> }
> }
>
>
> public abstract class ContextTest extends ContextTestSupport {
>
> @Override
> protected void setUp() throws Exception {
> context = createCamelContext();
> assertValidContext(context);
>
> ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
> "vm://localhost?broker.persistent=false");
> context.addComponent("jms",
> jmsComponentClientAcknowledge(connectionFactory));
>
> template = context.createProducerTemplate();
> consumer = context.createConsumerTemplate();
>
> if (isUseRouteBuilder()) {
> RouteBuilder[] builders = createRouteBuilders();
> for (RouteBuilder builder : builders) {
> log.debug("Using created route builder: " + builder);
> context.addRoutes(builder);
> }
> startCamelContext();
> } else {
> log.debug("isUseRouteBuilder() is false");
> }
> }
>
> }
>
> The junit failure trace is :
>
> java.lang.AssertionError: mock://result Body of message: 0. Expected: <AB>
> but was: <HelloHello>
> at org.apache.camel.component.mock.MockEndpoint.fail(MockEndpoint.java:899)
> at
> org.apache.camel.component.mock.MockEndpoint.assertEquals(MockEndpoint.java:881)
>
>
> but if i replace "jms" endpoint with "direct" endpoint .the junit case is
> ok.
> what is the problem ?
>
> --
> View this message in context: http://old.nabble.com/Multicasting-jms-message-with-AggregationStrategy-occur-errors-tp26911286p26911286.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>
--
Claus Ibsen
Apache Camel Committer
Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus