You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by honno <ho...@163.com> on 2009/12/24 08:44:10 UTC

Multicasting jms message with AggregationStrategy occur errors

When i send messages to jms endpoint with multicast ,i get error results.
This is my junit test.
public class MulticastStrategyRefClientTemplateDSL extends ContextTest {

	public void testSingleMulticastParallel() throws Exception {
		MockEndpoint mock = getMockEndpoint("mock:result");
		mock.expectedBodiesReceived("AB");

		template.sendBody("jms:start", "Hello");

		assertMockEndpointsSatisfied();
	}

	public void atestMulticastParallel() throws Exception {
		MockEndpoint mock = getMockEndpoint("mock:result");
		mock.expectedMessageCount(20);
		mock.whenAnyExchangeReceived(new Processor() {
			public void process(Exchange exchange) throws Exception {
				// they should all be AB even though A is slower than B
				assertEquals("AB", exchange.getIn().getBody(String.class));
			}
		});

		for (int i = 0; i < 20; i++) {
			template.sendBody("direct:start", "Hello");
		}

		assertMockEndpointsSatisfied();
	}

	@Override
	protected RouteBuilder createRouteBuilder() throws Exception {
		return new RouteBuilder() {
			@Override
			public void configure() throws Exception {
				from("jms:start").multicast(new AggregationStrategy() {
					public Exchange aggregate(Exchange oldExchange,
							Exchange newExchange) {
						if (oldExchange == null) {
							return newExchange;
						}

						String body = oldExchange.getIn().getBody(String.class);
						oldExchange.getIn().setBody(
								body
										+ newExchange.getIn().getBody(
												String.class));
						return oldExchange;
					}
				}).to("jms:a", "jms:b")
				// use end to indicate end of multicast route
						.end().to("mock:result");

				from("jms:a").delay(100).setBody(constant("A"));

				from("jms:b").setBody(constant("B"));
			}
		};
	}
}


public abstract class ContextTest extends ContextTestSupport {

	@Override
	protected void setUp() throws Exception {
		context = createCamelContext();
		assertValidContext(context);

		ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
				"vm://localhost?broker.persistent=false");
		context.addComponent("jms",
				jmsComponentClientAcknowledge(connectionFactory));

		template = context.createProducerTemplate();
		consumer = context.createConsumerTemplate();

		if (isUseRouteBuilder()) {
			RouteBuilder[] builders = createRouteBuilders();
			for (RouteBuilder builder : builders) {
				log.debug("Using created route builder: " + builder);
				context.addRoutes(builder);
			}
			startCamelContext();
		} else {
			log.debug("isUseRouteBuilder() is false");
		}
	}

}

The junit failure trace is :

java.lang.AssertionError: mock://result Body of message: 0. Expected: <AB>
but was: <HelloHello>
	at org.apache.camel.component.mock.MockEndpoint.fail(MockEndpoint.java:899)
	at
org.apache.camel.component.mock.MockEndpoint.assertEquals(MockEndpoint.java:881)


but if i replace "jms" endpoint with "direct" endpoint .the junit case is
ok.
what is the problem ?
	
-- 
View this message in context: http://old.nabble.com/Multicasting-jms-message-with-AggregationStrategy-occur-errors-tp26911286p26911286.html
Sent from the Camel - Users mailing list archive at Nabble.com.


Re: Multicasting jms message with AggregationStrategy occur errors

Posted by Claus Ibsen <cl...@gmail.com>.
You need to use requestBody as you want InOut MEP.
sendBody is InOnly.

Or change the MEP in the route etc. See this EIP pattern
http://camel.apache.org/request-reply.html


On Thu, Dec 24, 2009 at 8:44 AM, honno <ho...@163.com> wrote:
>
> When i send messages to jms endpoint with multicast ,i get error results.
> This is my junit test.
> public class MulticastStrategyRefClientTemplateDSL extends ContextTest {
>
>        public void testSingleMulticastParallel() throws Exception {
>                MockEndpoint mock = getMockEndpoint("mock:result");
>                mock.expectedBodiesReceived("AB");
>
>                template.sendBody("jms:start", "Hello");
>
>                assertMockEndpointsSatisfied();
>        }
>
>        public void atestMulticastParallel() throws Exception {
>                MockEndpoint mock = getMockEndpoint("mock:result");
>                mock.expectedMessageCount(20);
>                mock.whenAnyExchangeReceived(new Processor() {
>                        public void process(Exchange exchange) throws Exception {
>                                // they should all be AB even though A is slower than B
>                                assertEquals("AB", exchange.getIn().getBody(String.class));
>                        }
>                });
>
>                for (int i = 0; i < 20; i++) {
>                        template.sendBody("direct:start", "Hello");
>                }
>
>                assertMockEndpointsSatisfied();
>        }
>
>        @Override
>        protected RouteBuilder createRouteBuilder() throws Exception {
>                return new RouteBuilder() {
>                        @Override
>                        public void configure() throws Exception {
>                                from("jms:start").multicast(new AggregationStrategy() {
>                                        public Exchange aggregate(Exchange oldExchange,
>                                                        Exchange newExchange) {
>                                                if (oldExchange == null) {
>                                                        return newExchange;
>                                                }
>
>                                                String body = oldExchange.getIn().getBody(String.class);
>                                                oldExchange.getIn().setBody(
>                                                                body
>                                                                                + newExchange.getIn().getBody(
>                                                                                                String.class));
>                                                return oldExchange;
>                                        }
>                                }).to("jms:a", "jms:b")
>                                // use end to indicate end of multicast route
>                                                .end().to("mock:result");
>
>                                from("jms:a").delay(100).setBody(constant("A"));
>
>                                from("jms:b").setBody(constant("B"));
>                        }
>                };
>        }
> }
>
>
> public abstract class ContextTest extends ContextTestSupport {
>
>        @Override
>        protected void setUp() throws Exception {
>                context = createCamelContext();
>                assertValidContext(context);
>
>                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
>                                "vm://localhost?broker.persistent=false");
>                context.addComponent("jms",
>                                jmsComponentClientAcknowledge(connectionFactory));
>
>                template = context.createProducerTemplate();
>                consumer = context.createConsumerTemplate();
>
>                if (isUseRouteBuilder()) {
>                        RouteBuilder[] builders = createRouteBuilders();
>                        for (RouteBuilder builder : builders) {
>                                log.debug("Using created route builder: " + builder);
>                                context.addRoutes(builder);
>                        }
>                        startCamelContext();
>                } else {
>                        log.debug("isUseRouteBuilder() is false");
>                }
>        }
>
> }
>
> The junit failure trace is :
>
> java.lang.AssertionError: mock://result Body of message: 0. Expected: <AB>
> but was: <HelloHello>
>        at org.apache.camel.component.mock.MockEndpoint.fail(MockEndpoint.java:899)
>        at
> org.apache.camel.component.mock.MockEndpoint.assertEquals(MockEndpoint.java:881)
>
>
> but if i replace "jms" endpoint with "direct" endpoint .the junit case is
> ok.
> what is the problem ?
>
> --
> View this message in context: http://old.nabble.com/Multicasting-jms-message-with-AggregationStrategy-occur-errors-tp26911286p26911286.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus