You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by RobMMS <ro...@t-systems-mms.com> on 2008/11/07 17:00:08 UTC

RE: ExchangePatterns on Bean-Component

Hi,
I enabled the Tracer and it routes as I would expect it, but the problem
remains that 
List result =(List) template.sendBody("direct:start", ExchangePattern.InOut,
myQueryString);

only contains ResultObjectB instead of the aggregated List.
I've written a smaller Testcast with plain Java DSL, where i just process
Strings, and want to aggregate those and return the aggregated
exchange-body:


ROUTE:
public void testAggregation() throws Exception {			
		cc.addRoutes(new RouteBuilder() {
			@Override
			public void configure() throws Exception {
				MyAggregationStrategy ag = new MyAggregationStrategy();
			
from("direct:start").setHeader("ID").constant(1).multicast().setParallelProcessing(true).to("direct:stage1",
"direct:stage2");
				
				from("direct:stage1").process(new LogProcessor("A")).to("direct:merge");
				from("direct:stage2").process(new LogProcessor("B")).to("direct:merge");				
				
				from("direct:merge").aggregator(header("ID"),ag).batchSize(2).
				to("mock:end");				
			}			
		});		
		
		String res = (String) pt.requestBodyAndHeader("direct:start", "test",
"ID", 1);
		
		System.out.println("RESULT:\n"+ res);	
		assertEquals("A_test#B_test", res);
		
		MockEndpoint mock = (MockEndpoint) cc.getEndpoint("mock:end");
		mock.expectedMessageCount(1);
		mock.assertIsSatisfied();		
	}

LogProcessor:
public class LogProcessor implements Processor {

	private String name;
	
	public LogProcessor(String name) {
		this.name=name;
	}
	
	public void process(Exchange exchange) throws Exception {		
		exchange.getIn().setBody(name+"_"+exchange.getIn().getBody());
		
	}

MyAggregationStrategy:
public class MyAggregationStrategy implements AggregationStrategy {

	private Integer count = 0;
	
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        count = (Integer) oldExchange.getIn().getHeader("aggregated");  
        
    	if(count==null)
    		count=1;
    	
    	count++;
    	
    	String oldBody = oldExchange.getIn().getBody(String.class);
        String newBody = newExchange.getIn().getBody(String.class);
        
        Exchange copy = newExchange.copy();
        copy.getIn().setBody(oldBody+"#"+newBody);
        copy.getIn().setHeader("aggregated", count);
        System.out.println(copy.getIn());
        return copy;
    }
    
    public boolean isCompleted() {
    	if(count!=null)
    		System.out.println("Aggregated? -> "+(getCount()==2));
    	return getCount()==2;
    }
    
    public int getCount() {
    	return count;
    }
    
}

here is the Trace-output
07.11.2008 16:55:05 org.apache.camel.impl.DefaultCamelContext <init>
INFO: JMX is disabled. Using DefaultLifecycleStrategy.
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-0
  Node:		SetHeader[ID, 1]
  Headers:	{ID=1}
  Body:		test
  Pattern:	InOut
  Time:		1226073306321
</TRACER>

07.11.2008 16:55:06 org.apache.camel.impl.converter.DefaultTypeConverter
addTypeConverter
WARNUNG: Overriding type converter from: StaticMethodTypeConverter: public
static java.lang.String
org.apache.camel.converter.IOConverter.toString(javax.xml.transform.Source)
throws javax.xml.transform.TransformerException,java.io.IOException to:
InstanceMethodTypeConverter: public java.lang.String
org.apache.camel.converter.jaxp.XmlConverter.toString(javax.xml.transform.Source)
throws javax.xml.transform.TransformerException
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-1
  Node:		To[direct:stage1]
  Headers:	{ID=1}
  Body:		test
  Pattern:	InOut
  Time:		1226073306508
</TRACER>

07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-1
  Node:		process[de.mms.test.processor.LogProcessor@4aa0ce]
  Headers:	{ID=1}
  Body:		test
  Pattern:	InOut
  Time:		1226073306508
</TRACER>

07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-1
  Node:		To[direct:merge]
  Headers:	{ID=1}
  Body:		A_test
  Pattern:	InOut
  Time:		1226073306508
</TRACER>

07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-2
  Node:		To[direct:stage2]
  Headers:	{ID=1}
  Body:		test
  Pattern:	InOut
  Time:		1226073306524
</TRACER>

07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-2
  Node:		process[de.mms.test.processor.LogProcessor@1e4f7c2]
  Headers:	{ID=1}
  Body:		test
  Pattern:	InOut
  Time:		1226073306524
</TRACER>

07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-2
  Node:		To[direct:merge]
  Headers:	{ID=1}
  Body:		B_test
  Pattern:	InOut
  Time:		1226073306524
</TRACER>

Message: A_test#B_test
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-3
  Node:		To[mock:end]
  Headers:	{aggregated=2, ID=1}
  Body:		A_test#B_test
  Pattern:	InOut
  Time:		1226073306524
</TRACER>

this test fails with 
junit.framework.ComparisonFailure: result ok? expected:<[A_test#]B_test> but
was:<[]B_test>
although, mock:end receives the correct body.

What do I have to do to receive
A_test#B_test from my call:
String res = (String) pt.requestBodyAndHeader("direct:start", "test", "ID",
1); ??
is it actually possible with direct endpoints?


thx,
Rob




Claus Ibsen wrote:
> 
> Hi
> 
> You can use the tracer to enable tracing of the exchanges how they are
> routed in Camel.
> http://activemq.apache.org/camel/tracer.html
> 
> And I suppose that you type wrong in this mail as you wrote from:a twice:
> from("direct:a").process(new myProcessor()).bean(beanB,
> "methodB").to("direct:merge");
> 
> That is should be: from("direct:b")
> 
> 
> 
> Med venlig hilsen
>  
> Claus Ibsen
> ......................................
> Silverbullet
> Skovsgårdsvænget 21
> 8362 Hørning
> Tlf. +45 2962 7576
> Web: www.silverbullet.dk
> 
> -----Original Message-----
> From: RobMMS [mailto:robert.pelger@t-systems-mms.com] 
> Sent: 27. oktober 2008 13:15
> To: camel-user@activemq.apache.org
> Subject: ExchangePatterns on Bean-Component
> 
> 
> Hi,
> I'm observing a somewhat strange behaviour in my route:
> 
> from("direct:start").multicast(new MyOutAggregationStrategy(),
> true).to("direct:a", "direct:b");
> 
> from("direct:a").process(new myProcessor()).bean(beanA,
> "methodA").to("direct:merge");
> from("direct:a").process(new myProcessor()).bean(beanB,
> "methodB").to("direct:merge");
> 
> from("direct:merge").
>  aggregator(header(QueryPreProcessor.HEADER_CORRELATION_ID), new
> ResultAggregationStrategy()).
>  completedPredicate(header("aggregated").
>  isEqualTo(header("parallelStagesCount"))).
> to("mock:end");
> 
> List result =(List) template.sendBody("direct:start",
> ExchangePattern.InOut,
> myQueryString);
> 
> 
> What I want to do here is, I send a query-String to direct:a, which then
> multicasts to my parallel working beans, that each take a String value as
> paramter in methodA/B and return a ResultObjectA/B which is set as Body
> automatically (I do not set this explicitly in the beans).
> My ResultAggregationStrategy then puts ResultObjectA and ResultObjectB
> into
> als List-List which I want to get back as result of the template.sendbody
> call.
> 
> 
> The problem is that my result is most of the time the return value of the
> first bean being processed, only sometimes (like 10% of the time) it
> returns
> the aggregated list as expected.
> 
> I would expect the my route processes each step an when the aggregated
> message reaches mock:end the message's out body is returned to the caller
> ->
> result
> I'm not quite sure when I have to set the message's Out-Body and when it's
> ok to set the In-Body, because I think thats the problem here, but I'm not
> sure.
> 
> Maybe I'm completely wrong, so I hope you get what I need and tell me
> whats
> wrong with my route ;)
> 
> 
> Thanks, 
> Rob
> -- 
> View this message in context:
> http://www.nabble.com/ExchangePatterns-on-Bean-Component-tp20186540s22882p20186540.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/ExchangePatterns-on-Bean-Component-tp20186540s22882p20383351.html
Sent from the Camel - Users mailing list archive at Nabble.com.


RE: ExchangePatterns on Bean-Component

Posted by Claus Ibsen <ci...@silverbullet.dk>.
Hi

> is it actually possible with direct endpoints?
No

When you send an exchange to the aggregator using e.g. producer template (requestBody) then the exchange is completed immediately as the aggregator is like a queue with an event driven consumer that does the aggregation. So the route path is "breaked" when you hit the aggregator. So that is why Camel returns the "input" in the requestBody method.

You can not send to an aggregator and then wait until it has aggregated and get the "result". 

To get the result you must consume from the aggregator as the mock endpoint does.



Med venlig hilsen
 
Claus Ibsen
......................................
Silverbullet
Skovsgårdsvænget 21
8362 Hørning
Tlf. +45 2962 7576
Web: www.silverbullet.dk
-----Original Message-----
From: RobMMS [mailto:robert.pelger@t-systems-mms.com] 
Sent: 7. november 2008 17:00
To: camel-user@activemq.apache.org
Subject: RE: ExchangePatterns on Bean-Component


Hi,
I enabled the Tracer and it routes as I would expect it, but the problem
remains that 
List result =(List) template.sendBody("direct:start", ExchangePattern.InOut,
myQueryString);

only contains ResultObjectB instead of the aggregated List.
I've written a smaller Testcast with plain Java DSL, where i just process
Strings, and want to aggregate those and return the aggregated
exchange-body:


ROUTE:
public void testAggregation() throws Exception {			
		cc.addRoutes(new RouteBuilder() {
			@Override
			public void configure() throws Exception {
				MyAggregationStrategy ag = new MyAggregationStrategy();
			
from("direct:start").setHeader("ID").constant(1).multicast().setParallelProcessing(true).to("direct:stage1",
"direct:stage2");
				
				from("direct:stage1").process(new LogProcessor("A")).to("direct:merge");
				from("direct:stage2").process(new LogProcessor("B")).to("direct:merge");				
				
				from("direct:merge").aggregator(header("ID"),ag).batchSize(2).
				to("mock:end");				
			}			
		});		
		
		String res = (String) pt.requestBodyAndHeader("direct:start", "test",
"ID", 1);
		
		System.out.println("RESULT:\n"+ res);	
		assertEquals("A_test#B_test", res);
		
		MockEndpoint mock = (MockEndpoint) cc.getEndpoint("mock:end");
		mock.expectedMessageCount(1);
		mock.assertIsSatisfied();		
	}

LogProcessor:
public class LogProcessor implements Processor {

	private String name;
	
	public LogProcessor(String name) {
		this.name=name;
	}
	
	public void process(Exchange exchange) throws Exception {		
		exchange.getIn().setBody(name+"_"+exchange.getIn().getBody());
		
	}

MyAggregationStrategy:
public class MyAggregationStrategy implements AggregationStrategy {

	private Integer count = 0;
	
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) {
        count = (Integer) oldExchange.getIn().getHeader("aggregated");  
        
    	if(count==null)
    		count=1;
    	
    	count++;
    	
    	String oldBody = oldExchange.getIn().getBody(String.class);
        String newBody = newExchange.getIn().getBody(String.class);
        
        Exchange copy = newExchange.copy();
        copy.getIn().setBody(oldBody+"#"+newBody);
        copy.getIn().setHeader("aggregated", count);
        System.out.println(copy.getIn());
        return copy;
    }
    
    public boolean isCompleted() {
    	if(count!=null)
    		System.out.println("Aggregated? -> "+(getCount()==2));
    	return getCount()==2;
    }
    
    public int getCount() {
    	return count;
    }
    
}

here is the Trace-output
07.11.2008 16:55:05 org.apache.camel.impl.DefaultCamelContext <init>
INFO: JMX is disabled. Using DefaultLifecycleStrategy.
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-0
  Node:		SetHeader[ID, 1]
  Headers:	{ID=1}
  Body:		test
  Pattern:	InOut
  Time:		1226073306321
</TRACER>

07.11.2008 16:55:06 org.apache.camel.impl.converter.DefaultTypeConverter
addTypeConverter
WARNUNG: Overriding type converter from: StaticMethodTypeConverter: public
static java.lang.String
org.apache.camel.converter.IOConverter.toString(javax.xml.transform.Source)
throws javax.xml.transform.TransformerException,java.io.IOException to:
InstanceMethodTypeConverter: public java.lang.String
org.apache.camel.converter.jaxp.XmlConverter.toString(javax.xml.transform.Source)
throws javax.xml.transform.TransformerException
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-1
  Node:		To[direct:stage1]
  Headers:	{ID=1}
  Body:		test
  Pattern:	InOut
  Time:		1226073306508
</TRACER>

07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-1
  Node:		process[de.mms.test.processor.LogProcessor@4aa0ce]
  Headers:	{ID=1}
  Body:		test
  Pattern:	InOut
  Time:		1226073306508
</TRACER>

07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-1
  Node:		To[direct:merge]
  Headers:	{ID=1}
  Body:		A_test
  Pattern:	InOut
  Time:		1226073306508
</TRACER>

07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-2
  Node:		To[direct:stage2]
  Headers:	{ID=1}
  Body:		test
  Pattern:	InOut
  Time:		1226073306524
</TRACER>

07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-2
  Node:		process[de.mms.test.processor.LogProcessor@1e4f7c2]
  Headers:	{ID=1}
  Body:		test
  Pattern:	InOut
  Time:		1226073306524
</TRACER>

07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-2
  Node:		To[direct:merge]
  Headers:	{ID=1}
  Body:		B_test
  Pattern:	InOut
  Time:		1226073306524
</TRACER>

Message: A_test#B_test
07.11.2008 16:55:06 org.apache.camel.processor.Logger process
INFO: 
<TRACER>
  ID:		ID-WUM128085/4570-1226073306211/0-3
  Node:		To[mock:end]
  Headers:	{aggregated=2, ID=1}
  Body:		A_test#B_test
  Pattern:	InOut
  Time:		1226073306524
</TRACER>

this test fails with 
junit.framework.ComparisonFailure: result ok? expected:<[A_test#]B_test> but
was:<[]B_test>
although, mock:end receives the correct body.

What do I have to do to receive
A_test#B_test from my call:
String res = (String) pt.requestBodyAndHeader("direct:start", "test", "ID",
1); ??
is it actually possible with direct endpoints?


thx,
Rob




Claus Ibsen wrote:
> 
> Hi
> 
> You can use the tracer to enable tracing of the exchanges how they are
> routed in Camel.
> http://activemq.apache.org/camel/tracer.html
> 
> And I suppose that you type wrong in this mail as you wrote from:a twice:
> from("direct:a").process(new myProcessor()).bean(beanB,
> "methodB").to("direct:merge");
> 
> That is should be: from("direct:b")
> 
> 
> 
> Med venlig hilsen
>  
> Claus Ibsen
> ......................................
> Silverbullet
> Skovsgårdsvænget 21
> 8362 Hørning
> Tlf. +45 2962 7576
> Web: www.silverbullet.dk
> 
> -----Original Message-----
> From: RobMMS [mailto:robert.pelger@t-systems-mms.com] 
> Sent: 27. oktober 2008 13:15
> To: camel-user@activemq.apache.org
> Subject: ExchangePatterns on Bean-Component
> 
> 
> Hi,
> I'm observing a somewhat strange behaviour in my route:
> 
> from("direct:start").multicast(new MyOutAggregationStrategy(),
> true).to("direct:a", "direct:b");
> 
> from("direct:a").process(new myProcessor()).bean(beanA,
> "methodA").to("direct:merge");
> from("direct:a").process(new myProcessor()).bean(beanB,
> "methodB").to("direct:merge");
> 
> from("direct:merge").
>  aggregator(header(QueryPreProcessor.HEADER_CORRELATION_ID), new
> ResultAggregationStrategy()).
>  completedPredicate(header("aggregated").
>  isEqualTo(header("parallelStagesCount"))).
> to("mock:end");
> 
> List result =(List) template.sendBody("direct:start",
> ExchangePattern.InOut,
> myQueryString);
> 
> 
> What I want to do here is, I send a query-String to direct:a, which then
> multicasts to my parallel working beans, that each take a String value as
> paramter in methodA/B and return a ResultObjectA/B which is set as Body
> automatically (I do not set this explicitly in the beans).
> My ResultAggregationStrategy then puts ResultObjectA and ResultObjectB
> into
> als List-List which I want to get back as result of the template.sendbody
> call.
> 
> 
> The problem is that my result is most of the time the return value of the
> first bean being processed, only sometimes (like 10% of the time) it
> returns
> the aggregated list as expected.
> 
> I would expect the my route processes each step an when the aggregated
> message reaches mock:end the message's out body is returned to the caller
> ->
> result
> I'm not quite sure when I have to set the message's Out-Body and when it's
> ok to set the In-Body, because I think thats the problem here, but I'm not
> sure.
> 
> Maybe I'm completely wrong, so I hope you get what I need and tell me
> whats
> wrong with my route ;)
> 
> 
> Thanks, 
> Rob
> -- 
> View this message in context:
> http://www.nabble.com/ExchangePatterns-on-Bean-Component-tp20186540s22882p20186540.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/ExchangePatterns-on-Bean-Component-tp20186540s22882p20383351.html
Sent from the Camel - Users mailing list archive at Nabble.com.