You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Joe San <co...@gmail.com> on 2013/03/18 10:42:15 UTC

Camel Aggregator Strategy Question

Camel Users,

I need to acheive the following scenario:

I have a file system where 3 different CSV files will be written, csv1,
csv2 and csv3. All these 3 files will be available at the same time. I need
to read all the three files, do some processing on iut and produce one CSV
file as a result.

The question that I have here is, do I need to know the name of the csv
files in advance to read them from the file system? or is there a way to
read multiple files at once and get the file contents from the file name
where the file name is available as a header?

Regards,
Joe

Re: Camel Aggregator Strategy Question

Posted by Jothi <co...@gmail.com>.
Claus,

Thanks for pointing to the Javadoc. In fact, I started out blind with a very
little idea on how an aggregation would work. But after working my way
through the above exercise, I got a good hold of it. In fact, I made the
mistake of having my aggregation logic in a processor. I moved them to my
aggregator and bang it worked the way I wanted.



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Aggregator-Strategy-Question-tp5729337p5729570.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel Aggregator Strategy Question

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Please read the documentation and research a bit first yourself.

For example with Java code there is javadoc that documents the API.
So if you read the API of the aggregate method, you will find about the "null".

http://camel.apache.org/maven/current/camel-core/apidocs/org/apache/camel/processor/aggregate/AggregationStrategy.html


On Wed, Mar 20, 2013 at 6:42 PM, Jothi <co...@gmail.com> wrote:
> I modified my route to look like below:
>
>
> from("file:C:\\Users\\joe\\Desktop\\csv?noop=true").setHeader("isBuToPf",
> simple("true")).
>                 unmarshal().csv().process(new
> RefDataProcessor()).to("direct:Process");
>
>
> from("file:C:\\Users\\joe\\Desktop\\csv2?noop=true").setHeader("isBuToPf",
> simple("true")).
>                  unmarshal().csv().process(new
> RefDataProcessor()).to("direct:Process");
>
>         from("direct:Process").aggregate(header("isBuToPf"), new
> RefDataAggregationStrategy()).completionTimeout(3000).to("direct:Processed");
>
> Now I get a NullPointerException in my aggregator. My aggregator looks like:
>
>     @Override
>     public Exchange aggregate(Exchange exchange, Exchange exchange2) {
>
>         final StringBuilder sb = new StringBuilder();
>
>         final Map < String, List < String > > buToPf =
> exchange.getIn().getBody(Map.class); // NullPointer here...
>
>         final Map < String, List < String > > pfToUser =
> exchange2.getIn().getBody(Map.class);
>
>         for (Map.Entry < String, List < String > > entry :
> buToPf.entrySet()) {
>             for (String pf : entry.getValue()) {
>                 List < String > userList = pfToUser.get(pf);
>                 for (String user : userList) {
>                     sb.append(entry.getKey() + "," + pf + "," + user +
> "GO");
>                 }
>             }
>         }
>         System.out.println(sb);
>
>         exchange.getIn().setBody(sb);
>         return exchange;
>     }
>
> What I do in my RefDataProcessor.java is the following:
>
>     @Override
>     public void process(Exchange exchange) throws Exception {
>
>         final Map < String, List < String > > buToPfMap = new HashMap <
> String, List < String > > ();
>
>         final List < List < String > > data = (List < List < String > > )
> exchange.getIn().getBody();
>         for (List < String > line : data) {
>             putToMap(line.get(0), line.get(1), buToPfMap);
>         }
>
>         System.out.println(buToPfMap);
>
>         exchange.getIn().setBody(buToPfMap);
>     }
>
> Why is there nothing in exchange?
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Camel-Aggregator-Strategy-Question-tp5729337p5729555.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
FuseSource is now part of Red Hat
Email: cibsen@redhat.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: Camel Aggregator Strategy Question

Posted by Jothi <co...@gmail.com>.
I modified my route to look like below:

       
from("file:C:\\Users\\joe\\Desktop\\csv?noop=true").setHeader("isBuToPf",
simple("true")).
                unmarshal().csv().process(new
RefDataProcessor()).to("direct:Process");

       
from("file:C:\\Users\\joe\\Desktop\\csv2?noop=true").setHeader("isBuToPf",
simple("true")).
                 unmarshal().csv().process(new
RefDataProcessor()).to("direct:Process");

        from("direct:Process").aggregate(header("isBuToPf"), new
RefDataAggregationStrategy()).completionTimeout(3000).to("direct:Processed");

Now I get a NullPointerException in my aggregator. My aggregator looks like:

    @Override
    public Exchange aggregate(Exchange exchange, Exchange exchange2) {

        final StringBuilder sb = new StringBuilder();

        final Map < String, List < String > > buToPf =
exchange.getIn().getBody(Map.class); // NullPointer here...

        final Map < String, List < String > > pfToUser =
exchange2.getIn().getBody(Map.class);

        for (Map.Entry < String, List < String > > entry :
buToPf.entrySet()) {
            for (String pf : entry.getValue()) {
                List < String > userList = pfToUser.get(pf);
                for (String user : userList) {
                    sb.append(entry.getKey() + "," + pf + "," + user +
"GO");
                }
            }
        }
        System.out.println(sb);

        exchange.getIn().setBody(sb);
        return exchange;
    }

What I do in my RefDataProcessor.java is the following:

    @Override
    public void process(Exchange exchange) throws Exception {

        final Map < String, List < String > > buToPfMap = new HashMap <
String, List < String > > ();

        final List < List < String > > data = (List < List < String > > )
exchange.getIn().getBody();
        for (List < String > line : data) {
            putToMap(line.get(0), line.get(1), buToPfMap);
        }

        System.out.println(buToPfMap);

        exchange.getIn().setBody(buToPfMap);
    }

Why is there nothing in exchange?



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Aggregator-Strategy-Question-tp5729337p5729555.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel Aggregator Strategy Question

Posted by Jothi <co...@gmail.com>.
I have tried the following to achieve what I want. But I ran into some
errors:

My Route:

        from("file:C:\\Users\\joe\\Desktop\\csv?noop=true").
                unmarshal().csv().process(new
RefDataProcessor()).to("direct:Process");

        from("file:C:\\Users\\joe\\Desktop\\csv2?noop=true").
                 unmarshal().csv().process(new
RefDataProcessor()).to("direct:Process");

        from("direct:Process").aggregate(header("id"), new
RefDataAggregationStrategy()).completionTimeout(3000).to("direct:Processed");

The error that I get is the following:

[Users%5Cjoe%5CDesktop%5Ccsv] DefaultErrorHandler            ERROR Failed
delivery for (MessageId: ID-HP10007131-52648-1363799287926-0-1 on
ExchangeId: ID-HP10007131-52648-1363799287926-0-3). Exhausted after delivery
attempt: 1 caught: org.apache.camel.CamelExchangeException: Invalid
correlation key. Exchange[null]

What should I understand from the invalid correlation key error message?



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Aggregator-Strategy-Question-tp5729337p5729546.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel Aggregator Strategy Question

Posted by Pontus Ullgren <ul...@gmail.com>.
Add a transformation from CSV to Map object before the aggregator.

Then let the aggregator store a list of maps, or a map of maps, in the
exchange body.

You can then use this list in any subsequent transformation.

It all depends on what it is you intend to do with the data after it
has been aggregated.

// Pontus

On Wed, Mar 20, 2013 at 10:48 AM, Jothi <co...@gmail.com> wrote:
> Thanks for the reply.
>
> Would I be able to do the following? The 3 CSV files will contain different
> information. I need to poll for these 3 files and get 3 Map objects out of
> those 3 CSV files. I've seen the aggregation examples from the Camel
> documentation, but there we combine the old and the new exchange. If I had
> to do that, I will end up with one Map object which is not what I want. If I
> use the headers to store the Map, am I exploiting the very purpose of Camel
> headers? Is there a better way to do this?
>
> Regards,
> Joe
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Camel-Aggregator-Strategy-Question-tp5729337p5729489.html
> Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel Aggregator Strategy Question

Posted by Jothi <co...@gmail.com>.
Thanks for the reply. 

Would I be able to do the following? The 3 CSV files will contain different
information. I need to poll for these 3 files and get 3 Map objects out of
those 3 CSV files. I've seen the aggregation examples from the Camel
documentation, but there we combine the old and the new exchange. If I had
to do that, I will end up with one Map object which is not what I want. If I
use the headers to store the Map, am I exploiting the very purpose of Camel
headers? Is there a better way to do this?

Regards,
Joe



--
View this message in context: http://camel.465427.n5.nabble.com/Camel-Aggregator-Strategy-Question-tp5729337p5729489.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Camel Aggregator Strategy Question

Posted by Willem jiang <wi...@gmail.com>.
If three files are in same directory, you can just use one route with the aggregator.
If these files are in different directory, you can use three route like this

from("file:/csv1/").to("direct:merge");
from("file:/csv2/").to("direct:merge");
from("file:/csv3/").to("direct:merge");
from("direct:merge").aggregate(…)


--  
Willem Jiang

Red Hat, Inc.
FuseSource is now part of Red Hat
Web: http://www.fusesource.com | http://www.redhat.com
Blog: http://willemjiang.blogspot.com (http://willemjiang.blogspot.com/) (English)
          http://jnn.iteye.com (http://jnn.javaeye.com/) (Chinese)
Twitter: willemjiang  
Weibo: 姜宁willem





On Tuesday, March 19, 2013 at 5:12 AM, Pontus Ullgren wrote:

> Hello,
>  
> The file component will by default poll all files that are available
> in the directory.
> You could then use the aggregator EIP [1] and your own custom
> aggregation strategy to combine the files.
>  
> Possibly you could also use a custom route policy to restrict the
> polling of files until all files are available.
>  
> [1] http://camel.apache.org/aggregator2.html
>  
> Just my $0.02
> // Pontus
>  
> On Mon, Mar 18, 2013 at 10:42 AM, Joe San <codeintheopen@gmail.com (mailto:codeintheopen@gmail.com)> wrote:
> > Camel Users,
> >  
> > I need to acheive the following scenario:
> >  
> > I have a file system where 3 different CSV files will be written, csv1,
> > csv2 and csv3. All these 3 files will be available at the same time. I need
> > to read all the three files, do some processing on iut and produce one CSV
> > file as a result.
> >  
> > The question that I have here is, do I need to know the name of the csv
> > files in advance to read them from the file system? or is there a way to
> > read multiple files at once and get the file contents from the file name
> > where the file name is available as a header?
> >  
> > Regards,
> > Joe
>  




Re: Camel Aggregator Strategy Question

Posted by Pontus Ullgren <ul...@gmail.com>.
Hello,

The file component will by default poll all files that are available
in the directory.
You could then use the aggregator EIP [1] and your own custom
aggregation strategy to combine the files.

Possibly you could also use a custom route policy to restrict the
polling of files until all files are available.

[1] http://camel.apache.org/aggregator2.html

Just my $0.02
// Pontus

On Mon, Mar 18, 2013 at 10:42 AM, Joe San <co...@gmail.com> wrote:
> Camel Users,
>
> I need to acheive the following scenario:
>
> I have a file system where 3 different CSV files will be written, csv1,
> csv2 and csv3. All these 3 files will be available at the same time. I need
> to read all the three files, do some processing on iut and produce one CSV
> file as a result.
>
> The question that I have here is, do I need to know the name of the csv
> files in advance to read them from the file system? or is there a way to
> read multiple files at once and get the file contents from the file name
> where the file name is available as a header?
>
> Regards,
> Joe