You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Claus Ibsen <cl...@gmail.com> on 2009/10/28 08:15:13 UTC

Discussion - Route Policy feature in Camel 2.1

Hi

A bit of background is CAMEL-1048
https://issues.apache.org/activemq/browse/CAMEL-1048

I am looking into this how to implement this nicely in Camel.

CAMEL-1048 is maybe a bit too much at current requirements from Camel users.
What they are looking for is to be able to dynamic throttle consumers.

Apache CXF and ServiceMix has such a feature specially build in their
JMS components.
What it does is that it can stop the JMS listener when there are too
many messages in flight.

But on the other hand we have also had Camel users wanting to dynamic
start/stop consumers depending on some flag of some sort.
So there is also grounds to make this a general solution that can
cater both use cases.

I do wonder how to move forward.
In CAMEL-1048 there is an example using requires to associate a route
that this predicate must be true for the route to be running.
I do not like the naming requires or require. And having it in the
fluent builder / DSL makes it fixed.

Wonder if we should name such a configuration a RoutePolicy so you can
configure it as

<route routePolicy="myRoutePolicy">
   <from .../>
  ...
</route>

<bean id="myRoutePolicy" class=...>
   <property name="maxInflightExchanges" value="500"/>
</bean>

This is quite flexible as we can just offer a SPI interface for the
RoutePolicy and have people implement it as how they like it.
For example as above something that is controlled by the number of
current in flight exchanges.

Others can control it by CPU utliization, a switch, timer based etc.


In the Java DSL its

from("xxx").routePolicy(myRoutePolicy).to(yyyy);




-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Discussion - Route Policy feature in Camel 2.1

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

I have committed a first cut of this feature in rev 830861.

Please feel free to take a look.

And feedback on the locking in ThrottlingRoutePolicy class is welcome.
I did consider using try locks in case there was a race, as after all
we just want at least one of them to acquire the lock and invoke the
suspension / resume.
There may be clever ways to archive this.

For example running it by throttling this route
    <bean id="myPolicy" class="org.apache.camel.impl.ThrottlingRoutePolicy">
        <property name="maxInflightExchanges" value="16"/>
        <property name="resumePercentOfMax" value="25"/>
    </bean>

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route routePolicyRef="myPolicy">
            <from uri="activemq:queue:foo?concurrentConsumers=20"/>
            <transacted/>
            <delay><constant>100</constant></delay>
            <to uri="log:foo?groupSize=10"/>
            <to uri="mock:result"/>
        </route>
    </camelContext>

Will yield this output. Yes its throttling very often but this is due
to the low number of concurrent consumers and the fact the exchanges
are routed at a constant pace.



2009-10-29 10:02:12,788 [main           ] INFO  DefaultCamelContext
        - Apache Camel 2.1-SNAPSHOT (CamelContext:camelContext)
started
2009-10-29 10:02:13,179 [enerContainer-6] INFO  foo
        - Received: 10 messages so far. Last group took: 4 millis
which is: 2,500 messages per second. average: 2,500
2009-10-29 10:02:13,184 [nerContainer-19] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,193 [nerContainer-13] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,203 [nerContainer-16] INFO  foo
        - Received: 20 messages so far. Last group took: 25 millis
which is: 400 messages per second. average: 689.655
2009-10-29 10:02:13,302 [nerContainer-10] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,320 [enerContainer-9] INFO  foo
        - Received: 30 messages so far. Last group took: 117 millis
which is: 85.47 messages per second. average: 205.479
2009-10-29 10:02:13,333 [nerContainer-18] INFO  foo
        - Received: 40 messages so far. Last group took: 13 millis
which is: 769.231 messages per second. average: 251.572
2009-10-29 10:02:13,344 [enerContainer-3] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,444 [nerContainer-10] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,446 [nerContainer-15] INFO  foo
        - Received: 50 messages so far. Last group took: 113 millis
which is: 88.496 messages per second. average: 183.824
2009-10-29 10:02:13,455 [enerContainer-3] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,457 [nerContainer-11] INFO  foo
        - Received: 60 messages so far. Last group took: 11 millis
which is: 909.091 messages per second. average: 212.014
2009-10-29 10:02:13,560 [enerContainer-2] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,570 [nerContainer-13] INFO  foo
        - Received: 70 messages so far. Last group took: 113 millis
which is: 88.496 messages per second. average: 176.768
2009-10-29 10:02:13,574 [nerContainer-11] INFO  foo
        - Received: 80 messages so far. Last group took: 4 millis
which is: 2,500 messages per second. average: 200
2009-10-29 10:02:13,574 [nerContainer-16] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,660 [main           ] INFO  MockEndpoint
        - Asserting: Endpoint[mock://result] is satisfied
2009-10-29 10:02:13,681 [nerContainer-20] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,693 [nerContainer-13] INFO  foo
        - Received: 90 messages so far. Last group took: 119 millis
which is: 84.034 messages per second. average: 173.41
2009-10-29 10:02:13,701 [enerContainer-5] INFO  foo
        - Received: 100 messages so far. Last group took: 8 millis
which is: 1,250 messages per second. average: 189.753
2009-10-29 10:02:13,725 [nerContainer-16] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,809 [enerContainer-2] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,826 [enerContainer-4] INFO  foo
        - Received: 110 messages so far. Last group took: 125 millis
which is: 80 messages per second. average: 168.712
2009-10-29 10:02:13,837 [enerContainer-5] INFO  foo
        - Received: 120 messages so far. Last group took: 11 millis
which is: 909.091 messages per second. average: 180.995
2009-10-29 10:02:13,851 [enerContainer-3] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,941 [nerContainer-15] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:13,952 [nerContainer-19] INFO  foo
        - Received: 130 messages so far. Last group took: 115 millis
which is: 86.957 messages per second. average: 167.095
2009-10-29 10:02:13,957 [enerContainer-9] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:13,963 [nerContainer-16] INFO  foo
        - Received: 140 messages so far. Last group took: 11 millis
which is: 909.091 messages per second. average: 177.44
2009-10-29 10:02:14,071 [nerContainer-18] INFO  foo
        - Received: 150 messages so far. Last group took: 108 millis
which is: 92.593 messages per second. average: 167.224
2009-10-29 10:02:14,171 [enerContainer-6] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:14,177 [nerContainer-14] INFO  foo
        - Received: 160 messages so far. Last group took: 106 millis
which is: 94.34 messages per second. average: 159.521
2009-10-29 10:02:14,198 [enerContainer-9] INFO  foo
        - Received: 170 messages so far. Last group took: 21 millis
which is: 476.19 messages per second. average: 166.016
2009-10-29 10:02:14,199 [nerContainer-13] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:14,306 [nerContainer-14] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 20 > 16 inflight exchange by
suspending consumer.
2009-10-29 10:02:14,314 [nerContainer-12] INFO  foo
        - Received: 180 messages so far. Last group took: 116 millis
which is: 86.207 messages per second. average: 157.895
2009-10-29 10:02:14,317 [nerContainer-18] INFO  foo
        - Received: 190 messages so far. Last group took: 3 millis
which is: 3,333.333 messages per second. average: 166.229
2009-10-29 10:02:14,317 [nerContainer-10] INFO  ThrottlingRoutePolicy
        - Throtteling consumer: 4 <= 4 inflight exchange by resuming
consumer.
2009-10-29 10:02:14,432 [enerContainer-3] INFO  foo
        - Received: 200 messages so far. Last group took: 115 millis
which is: 86.957 messages per second. average: 158.983
2009-10-29 10:02:14,434 [main           ] INFO  DefaultCamelContext
        - Apache Camel 2.1-SNAPSHOT (CamelContext:camelContext) is
stopping

On Wed, Oct 28, 2009 at 3:15 PM, Claus Ibsen <cl...@gmail.com> wrote:
> Hi
>
> I have done some experiment and I am setting down for something like this
>
> Where we can configure a route policy to use.
> In this case we use a throtteling policy that is based on the number
> of in flight exchanges.
>
> In this example at most 16 in flights messages is aimed for. It can go higher.
> But when the policy is investigated it will react if the current in
> flight > max.
> If so it will suspend/stop the consumer. And when it goes down to 25%
> of 16 (=4) then it will resume / start the consumer again.
>
> Apache CXF have already this feature. So its inspired from this project.
>
>    <bean id="myPolicy" class="org.apache.camel.impl.ThrottelingRoutePolicy">
>        <property name="maxInflightExchanges" value="16"/>
>        <property name="reconnectPercentOfMax" value="25"/>
>    </bean>
>
>    <camelContext xmlns="http://camel.apache.org/schema/spring">
>        <route routePolicyRef="myPolicy">
>            <from uri="activemq:queue:foo?concurrentConsumers=20"/>
>            <transacted/>
>            <delay><constant>100</constant></delay>
>            <to uri="log:foo?groupSize=10"/>
>            <to uri="mock:result"/>
>        </route>
>    </camelContext>
>
>
>
> You can of course implement your own route policy and do whatever you like.
>
> The route policy is this interface. I amy want to adjust the naming of
> that method and maybe change it to be more routish.
> Its based on a callback that is invoked when an Exchange is done being
> routed. So maybe something as:
>
>      void onRouteDone(Route route, Exchange exchange);
>
> is better as its more clear to the point. And you can get the Consumer
> from the Route so you can grab it and susped/stop it on the fly.
>
>
> public interface RoutePolicy {
>
>    /**
>     * Callback invokes when a {@link Consumer} have consumed and
> created an {@link Exchange}
>     *
>     * @param consumer  the consumer
>     * @param exchange  the created exchange
>     */
>    void onConsume(Consumer consumer, Exchange exchange);
> }
>
>
>
>
> On Wed, Oct 28, 2009 at 8:15 AM, Claus Ibsen <cl...@gmail.com> wrote:
>> Hi
>>
>> A bit of background is CAMEL-1048
>> https://issues.apache.org/activemq/browse/CAMEL-1048
>>
>> I am looking into this how to implement this nicely in Camel.
>>
>> CAMEL-1048 is maybe a bit too much at current requirements from Camel users.
>> What they are looking for is to be able to dynamic throttle consumers.
>>
>> Apache CXF and ServiceMix has such a feature specially build in their
>> JMS components.
>> What it does is that it can stop the JMS listener when there are too
>> many messages in flight.
>>
>> But on the other hand we have also had Camel users wanting to dynamic
>> start/stop consumers depending on some flag of some sort.
>> So there is also grounds to make this a general solution that can
>> cater both use cases.
>>
>> I do wonder how to move forward.
>> In CAMEL-1048 there is an example using requires to associate a route
>> that this predicate must be true for the route to be running.
>> I do not like the naming requires or require. And having it in the
>> fluent builder / DSL makes it fixed.
>>
>> Wonder if we should name such a configuration a RoutePolicy so you can
>> configure it as
>>
>> <route routePolicy="myRoutePolicy">
>>   <from .../>
>>  ...
>> </route>
>>
>> <bean id="myRoutePolicy" class=...>
>>   <property name="maxInflightExchanges" value="500"/>
>> </bean>
>>
>> This is quite flexible as we can just offer a SPI interface for the
>> RoutePolicy and have people implement it as how they like it.
>> For example as above something that is controlled by the number of
>> current in flight exchanges.
>>
>> Others can control it by CPU utliization, a switch, timer based etc.
>>
>>
>> In the Java DSL its
>>
>> from("xxx").routePolicy(myRoutePolicy).to(yyyy);
>>
>>
>>
>>
>> --
>> Claus Ibsen
>> Apache Camel Committer
>>
>> Author of Camel in Action: http://www.manning.com/ibsen/
>> Open Source Integration: http://fusesource.com
>> Blog: http://davsclaus.blogspot.com/
>> Twitter: http://twitter.com/davsclaus
>>
>
>
>
> --
> Claus Ibsen
> Apache Camel Committer
>
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Discussion - Route Policy feature in Camel 2.1

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

I have done some experiment and I am setting down for something like this

Where we can configure a route policy to use.
In this case we use a throtteling policy that is based on the number
of in flight exchanges.

In this example at most 16 in flights messages is aimed for. It can go higher.
But when the policy is investigated it will react if the current in
flight > max.
If so it will suspend/stop the consumer. And when it goes down to 25%
of 16 (=4) then it will resume / start the consumer again.

Apache CXF have already this feature. So its inspired from this project.

    <bean id="myPolicy" class="org.apache.camel.impl.ThrottelingRoutePolicy">
        <property name="maxInflightExchanges" value="16"/>
        <property name="reconnectPercentOfMax" value="25"/>
    </bean>

    <camelContext xmlns="http://camel.apache.org/schema/spring">
        <route routePolicyRef="myPolicy">
            <from uri="activemq:queue:foo?concurrentConsumers=20"/>
            <transacted/>
            <delay><constant>100</constant></delay>
            <to uri="log:foo?groupSize=10"/>
            <to uri="mock:result"/>
        </route>
    </camelContext>



You can of course implement your own route policy and do whatever you like.

The route policy is this interface. I amy want to adjust the naming of
that method and maybe change it to be more routish.
Its based on a callback that is invoked when an Exchange is done being
routed. So maybe something as:

      void onRouteDone(Route route, Exchange exchange);

is better as its more clear to the point. And you can get the Consumer
from the Route so you can grab it and susped/stop it on the fly.


public interface RoutePolicy {

    /**
     * Callback invokes when a {@link Consumer} have consumed and
created an {@link Exchange}
     *
     * @param consumer  the consumer
     * @param exchange  the created exchange
     */
    void onConsume(Consumer consumer, Exchange exchange);
}




On Wed, Oct 28, 2009 at 8:15 AM, Claus Ibsen <cl...@gmail.com> wrote:
> Hi
>
> A bit of background is CAMEL-1048
> https://issues.apache.org/activemq/browse/CAMEL-1048
>
> I am looking into this how to implement this nicely in Camel.
>
> CAMEL-1048 is maybe a bit too much at current requirements from Camel users.
> What they are looking for is to be able to dynamic throttle consumers.
>
> Apache CXF and ServiceMix has such a feature specially build in their
> JMS components.
> What it does is that it can stop the JMS listener when there are too
> many messages in flight.
>
> But on the other hand we have also had Camel users wanting to dynamic
> start/stop consumers depending on some flag of some sort.
> So there is also grounds to make this a general solution that can
> cater both use cases.
>
> I do wonder how to move forward.
> In CAMEL-1048 there is an example using requires to associate a route
> that this predicate must be true for the route to be running.
> I do not like the naming requires or require. And having it in the
> fluent builder / DSL makes it fixed.
>
> Wonder if we should name such a configuration a RoutePolicy so you can
> configure it as
>
> <route routePolicy="myRoutePolicy">
>   <from .../>
>  ...
> </route>
>
> <bean id="myRoutePolicy" class=...>
>   <property name="maxInflightExchanges" value="500"/>
> </bean>
>
> This is quite flexible as we can just offer a SPI interface for the
> RoutePolicy and have people implement it as how they like it.
> For example as above something that is controlled by the number of
> current in flight exchanges.
>
> Others can control it by CPU utliization, a switch, timer based etc.
>
>
> In the Java DSL its
>
> from("xxx").routePolicy(myRoutePolicy).to(yyyy);
>
>
>
>
> --
> Claus Ibsen
> Apache Camel Committer
>
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Discussion - Route Policy feature in Camel 2.1

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

I have implemented this feature into the trunk now.

You can checkout the documentation at
http://camel.apache.org/routepolicy.html

And I have also added an example you can play with
./examples/camel-example-route-throttling

Which also has a bit of documentation at
http://cwiki.apache.org/confluence/display/CAMEL/Route+Throttling+Example


On Thu, Oct 29, 2009 at 12:40 PM, Claus Ibsen <cl...@gmail.com> wrote:
> Hi
>
> I recently added an example as well based on throttling a jms route.
>
> camel-example-route-throttling
>
> There is a README.txt with instructions to run it.
>
>
>
> On Wed, Oct 28, 2009 at 8:15 AM, Claus Ibsen <cl...@gmail.com> wrote:
>> Hi
>>
>> A bit of background is CAMEL-1048
>> https://issues.apache.org/activemq/browse/CAMEL-1048
>>
>> I am looking into this how to implement this nicely in Camel.
>>
>> CAMEL-1048 is maybe a bit too much at current requirements from Camel users.
>> What they are looking for is to be able to dynamic throttle consumers.
>>
>> Apache CXF and ServiceMix has such a feature specially build in their
>> JMS components.
>> What it does is that it can stop the JMS listener when there are too
>> many messages in flight.
>>
>> But on the other hand we have also had Camel users wanting to dynamic
>> start/stop consumers depending on some flag of some sort.
>> So there is also grounds to make this a general solution that can
>> cater both use cases.
>>
>> I do wonder how to move forward.
>> In CAMEL-1048 there is an example using requires to associate a route
>> that this predicate must be true for the route to be running.
>> I do not like the naming requires or require. And having it in the
>> fluent builder / DSL makes it fixed.
>>
>> Wonder if we should name such a configuration a RoutePolicy so you can
>> configure it as
>>
>> <route routePolicy="myRoutePolicy">
>>   <from .../>
>>  ...
>> </route>
>>
>> <bean id="myRoutePolicy" class=...>
>>   <property name="maxInflightExchanges" value="500"/>
>> </bean>
>>
>> This is quite flexible as we can just offer a SPI interface for the
>> RoutePolicy and have people implement it as how they like it.
>> For example as above something that is controlled by the number of
>> current in flight exchanges.
>>
>> Others can control it by CPU utliization, a switch, timer based etc.
>>
>>
>> In the Java DSL its
>>
>> from("xxx").routePolicy(myRoutePolicy).to(yyyy);
>>
>>
>>
>>
>> --
>> Claus Ibsen
>> Apache Camel Committer
>>
>> Author of Camel in Action: http://www.manning.com/ibsen/
>> Open Source Integration: http://fusesource.com
>> Blog: http://davsclaus.blogspot.com/
>> Twitter: http://twitter.com/davsclaus
>>
>
>
>
> --
> Claus Ibsen
> Apache Camel Committer
>
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus

Re: Discussion - Route Policy feature in Camel 2.1

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

I recently added an example as well based on throttling a jms route.

camel-example-route-throttling

There is a README.txt with instructions to run it.



On Wed, Oct 28, 2009 at 8:15 AM, Claus Ibsen <cl...@gmail.com> wrote:
> Hi
>
> A bit of background is CAMEL-1048
> https://issues.apache.org/activemq/browse/CAMEL-1048
>
> I am looking into this how to implement this nicely in Camel.
>
> CAMEL-1048 is maybe a bit too much at current requirements from Camel users.
> What they are looking for is to be able to dynamic throttle consumers.
>
> Apache CXF and ServiceMix has such a feature specially build in their
> JMS components.
> What it does is that it can stop the JMS listener when there are too
> many messages in flight.
>
> But on the other hand we have also had Camel users wanting to dynamic
> start/stop consumers depending on some flag of some sort.
> So there is also grounds to make this a general solution that can
> cater both use cases.
>
> I do wonder how to move forward.
> In CAMEL-1048 there is an example using requires to associate a route
> that this predicate must be true for the route to be running.
> I do not like the naming requires or require. And having it in the
> fluent builder / DSL makes it fixed.
>
> Wonder if we should name such a configuration a RoutePolicy so you can
> configure it as
>
> <route routePolicy="myRoutePolicy">
>   <from .../>
>  ...
> </route>
>
> <bean id="myRoutePolicy" class=...>
>   <property name="maxInflightExchanges" value="500"/>
> </bean>
>
> This is quite flexible as we can just offer a SPI interface for the
> RoutePolicy and have people implement it as how they like it.
> For example as above something that is controlled by the number of
> current in flight exchanges.
>
> Others can control it by CPU utliization, a switch, timer based etc.
>
>
> In the Java DSL its
>
> from("xxx").routePolicy(myRoutePolicy).to(yyyy);
>
>
>
>
> --
> Claus Ibsen
> Apache Camel Committer
>
> Author of Camel in Action: http://www.manning.com/ibsen/
> Open Source Integration: http://fusesource.com
> Blog: http://davsclaus.blogspot.com/
> Twitter: http://twitter.com/davsclaus
>



-- 
Claus Ibsen
Apache Camel Committer

Author of Camel in Action: http://www.manning.com/ibsen/
Open Source Integration: http://fusesource.com
Blog: http://davsclaus.blogspot.com/
Twitter: http://twitter.com/davsclaus