You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by kristofsajdak <kr...@gmail.com> on 2014/04/30 00:02:27 UTC

camel-gpars

Hi,


First of all just want to say I really love the Camel Groovy dsl, being able
to use closures and access map keys as properties really cleans up the
routes

The reason I'm writing up this post is because I wanted to get some feedback
on some of the experiments I have been setting up the last couple of weeks.

I recently came across a really nice library named Gpars, in a nutshell it
implements well known concurrency concepts such as Dataflows, Actors, STM
etc


Exploring the GPars Dataflow documentation
(http://gpars.codehaus.org/Dataflow) I was really amazed by the
expressivity, composing concurrent tasks is remarkably easy :

    final Promise p1 = task { "foo" }
    final Promise p2 = task { "bar" }

    whenAllBound(p1, p2, { p1Bound, p2Bound ->
        println "$p1Bound $p2Bound" // this prints foobar
    })
    .join()


After playing around a bit with the examples I wondered how Camel would deal
with the same problem domain, so I decided to implement a simple use case :


Expose a price quote service via an http endpoint, which when invoked in
turn calls 2 other services via http : price and discount. 
The idea is to execute these services in parallel therefore reducing
latency, when both price and discount values come back the quote is
calculated and handed back to the caller

Using the Camel Groovy dsl I came up with the following :


    from("jetty:http://0.0.0.0:8080/quote")
        .setHeader(Exchange.HTTP_QUERY, { Exchange exchange ->
exchange.in.headers[Exchange.HTTP_QUERY] })
        .multicast(new
GroupedExchangeAggregationStrategy()).parallelProcessing()
        .to("direct:discounts", "direct:prices")
        .end()
        .process(
        { Exchange exchange ->
            final groupedExchange =
exchange.getProperty(Exchange.GROUPED_EXCHANGE) as Collection<Exchange>
            final price = groupedExchange.find({ it.in.headers.serviceType
== "price" }).in.body.price
            final discount = groupedExchange.find({
it.in.headers.serviceType == "discount" }).in.body.discount
            final amount = price * (1 - (discount / 100))
            exchange.out.body = amount
        })

   
from("direct:discounts").to("ahc:http://0.0.0.0:8087/discounts?bridgeEndpoint=true").unmarshal(json).setHeader("serviceType",
constant("discount"))
   
from("direct:prices").to("ahc:http://0.0.0.0:8088/prices?bridgeEndpoint=true").unmarshal(json).setHeader("serviceType",
constant("price"))


Which is not bad but the multicast aggregation strategy doesn't give you
references to the individual price and discount call results. 
Given the fact that the calls are done in parallel you can't count on the
order of Exchange list items returned by the GROUPED_EXCHANGE property

The workaround I used here was to mark each Exchange with a serviceType
header, later on in the aggregation strategy the header is used to find the
distinct price and discount values in the Exchange list

Thanks to the Groovy language features the code is still rather concise but
in my view the Camel api offers very little support for composing parallel
service calls


I decided to integrate Gpars into Camel and see whether it would improve
things, have a look at the same routes implemented using a Dataflow :


    from("jetty:http://0.0.0.0:8080/quote").task({ Exchange exchange ->

        final headers = [(Exchange.HTTP_QUERY):
exchange.in.headers[Exchange.HTTP_QUERY]]
        whenAllBound(
            template.requestBodyAndHeadersAsPromise("direct:discounts", "",
headers),
            template.requestBodyAndHeadersAsPromise("direct:prices", "",
headers),
            { Map discountResolved, Map priceResolved ->
                priceResolved.price * (1 - (discountResolved.discount /
100))
            }
        )

    })

   
from("direct:discounts").to("ahc:http://0.0.0.0:8087/discounts").unmarshal(json)
   
from("direct:prices").to("ahc:http://0.0.0.0:8088/prices").unmarshal(json)



The ProducerTemplate is extended with a requestBodyAndHeadersAsPromise
method which returns a Dataflow Promise, this can now be composed in
Dataflow functions such as whenAllBound, then etc

In the example above I am using the whenAllBound function to wait ( in a
non-blocking way ) until both requestBodyAndHeaders (price and discount)
Promises are resolved.

Unlike the aggregationStrategy the whenAllBound Closure has named variables
referring to price and discount bound values, the variables are declared in
the same order as the Promises were registered in whenAllBound.


In my view the camel-gpars integration exploits the strengths of both
frameworks and makes composing concurrent service calls far less complex.


You can find the full commented code for this along with other examples in
my camel-gpars github repo :

https://github.com/kristofsajdak/camel-gpars/blob/master/src/test/java/com/kristofsajdak/camel/gpars/camel-quote-example.groovy


Have a look and let me know what you think ?



Best regards,


Kristof




--
View this message in context: http://camel.465427.n5.nabble.com/camel-gpars-tp5750704.html
Sent from the Camel Development mailing list archive at Nabble.com.