You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Gowtham <go...@gmail.com> on 2017/05/01 04:43:27 UTC

REG: Aggregator not working fine in camel version 2.8.3

I'm using Spring Framework for IOC in my project. And using Apache-Camel to
deal with reading CSV files marshalling and unmarshalling it. And my
requirement is to merge marshalled data from two CSV Files and store it in a
single file in Output folder. That's when I came across Apache camel
aggregator EIP. I tried to understand it and give it a go.

This is what I have done so far. I have three folders, _input, _intermediate
and _output.
The _input folder contains _input.txt file with some text and similarly
_intermediate folder contains _intermediate.txt with some text. Now in my
route builder I have given like,

		from("file:///C://test//_input").setHeader("myID", constant(1))
			.log("Body from Route1 : ${body}")
			.to("direct:route-out");//.to("direct:route1-out");
	
		from("file:///C://test//_intermediate").setHeader("myID", constant(1))
			.log("Body from Route2 : ${body}")
			.to("direct:route-out");//.to("direct:route2-out");
		
		from("direct:route-out")
		.aggregate( header("myID"), myAggregationStrategy)
			.completionFromBatchConsumer()
			.log("Sending out ${body}")
			.to("file:///C://test//_output");

Where my Aggregation Strategy is like,

	@Override
	public Exchange aggregate(Exchange aggregatingExchange, Exchange
incomingExchange) {
		
		if (aggregatingExchange == null) {
			Map<String, Object> headerMap = new
TreeMap<>(incomingExchange.getIn().getHeaders());
			System.out.println("Header Map : "+headerMap);
			return incomingExchange;   //---------------->Returning null for all
messages
		}
		
		String oldBody = aggregatingExchange.getIn().getBody(String.class);
		String newBody = incomingExchange.getIn().getBody(String.class);
		
		String body = oldBody+"\n"+newBody;
		System.out.println("New Body : "+body);
		aggregatingExchange.getIn().setBody(body);
		
		return aggregatingExchange;
	}


When I try to run the above, both files present in _input and _intermediate
folder are moved to _output folder. When debugging, I come to know that, the
aggregating exchange is null even when it reads second message too. But I
have kept my CorrelationExpression as header("myID") which must group both
my messages from route as a same group. But it's treating as the two
messages belong to completely different group. And also I'm using the camel
version of 2.8.2(core, bindy and test).

Am I missing any step, or is it just the problem with the version of camel.
As I can't go to other versions of camel as it is a business requirement. So
if it is with a version of camel, can you suggest me an alternate way to
achieve the same ?



-----
Gowtham Alaguraj
--
View this message in context: http://camel.465427.n5.nabble.com/REG-Aggregator-not-working-fine-in-camel-version-2-8-3-tp5798536.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: REG: Aggregator not working fine in camel version 2.8.3

Posted by Gowtham <go...@gmail.com>.
Thanks Claus.

I have finally figured out the EIP required for my scenario. 



-----
Gowtham Alaguraj
--
View this message in context: http://camel.465427.n5.nabble.com/REG-Aggregator-not-working-fine-in-camel-version-2-8-3-tp5798536p5799455.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: REG: Aggregator not working fine in camel version 2.8.3

Posted by Claus Ibsen <cl...@gmail.com>.
You cannot use 2 different routes and use the complete from batch
consumer and expect that this "somehow magically" knows there are 2
routes.

That completion is only for a single route and its group of batch
message that was polled.

Instead of 2 routes, use 1 route. And you can use a filter to match
which directories to include, and have it use recursive to scan into
sub folders.

If you want 2 routes then use a different completion predicate to
signal when the group is done.


On Mon, May 1, 2017 at 12:43 AM, Gowtham <go...@gmail.com> wrote:
> I'm using Spring Framework for IOC in my project. And using Apache-Camel to
> deal with reading CSV files marshalling and unmarshalling it. And my
> requirement is to merge marshalled data from two CSV Files and store it in a
> single file in Output folder. That's when I came across Apache camel
> aggregator EIP. I tried to understand it and give it a go.
>
> This is what I have done so far. I have three folders, _input, _intermediate
> and _output.
> The _input folder contains _input.txt file with some text and similarly
> _intermediate folder contains _intermediate.txt with some text. Now in my
> route builder I have given like,
>
>                 from("file:///C://test//_input").setHeader("myID", constant(1))
>                         .log("Body from Route1 : ${body}")
>                         .to("direct:route-out");//.to("direct:route1-out");
>
>                 from("file:///C://test//_intermediate").setHeader("myID", constant(1))
>                         .log("Body from Route2 : ${body}")
>                         .to("direct:route-out");//.to("direct:route2-out");
>
>                 from("direct:route-out")
>                 .aggregate( header("myID"), myAggregationStrategy)
>                         .completionFromBatchConsumer()
>                         .log("Sending out ${body}")
>                         .to("file:///C://test//_output");
>
> Where my Aggregation Strategy is like,
>
>         @Override
>         public Exchange aggregate(Exchange aggregatingExchange, Exchange
> incomingExchange) {
>
>                 if (aggregatingExchange == null) {
>                         Map<String, Object> headerMap = new
> TreeMap<>(incomingExchange.getIn().getHeaders());
>                         System.out.println("Header Map : "+headerMap);
>                         return incomingExchange;   //---------------->Returning null for all
> messages
>                 }
>
>                 String oldBody = aggregatingExchange.getIn().getBody(String.class);
>                 String newBody = incomingExchange.getIn().getBody(String.class);
>
>                 String body = oldBody+"\n"+newBody;
>                 System.out.println("New Body : "+body);
>                 aggregatingExchange.getIn().setBody(body);
>
>                 return aggregatingExchange;
>         }
>
>
> When I try to run the above, both files present in _input and _intermediate
> folder are moved to _output folder. When debugging, I come to know that, the
> aggregating exchange is null even when it reads second message too. But I
> have kept my CorrelationExpression as header("myID") which must group both
> my messages from route as a same group. But it's treating as the two
> messages belong to completely different group. And also I'm using the camel
> version of 2.8.2(core, bindy and test).
>
> Am I missing any step, or is it just the problem with the version of camel.
> As I can't go to other versions of camel as it is a business requirement. So
> if it is with a version of camel, can you suggest me an alternate way to
> achieve the same ?
>
>
>
> -----
> Gowtham Alaguraj
> --
> View this message in context: http://camel.465427.n5.nabble.com/REG-Aggregator-not-working-fine-in-camel-version-2-8-3-tp5798536.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2