You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Andre Piwoni <ap...@yahoo.com> on 2011/08/25 19:38:38 UTC

How to stop all Exchanges based on criteria like header value?

I have an instance of a job that is handled by multiple processors. All
Exchanges for this instance of a job can be identified by a value of JOB_ID
header. What is the best way to terminate processing of an instance of such
a job?

I can think of the following that needs to happen:
(1) Remove pending messages with a specified value of JOB_ID header from JMS
queue
(2) Stop all Exchanges with a specified value of JOB_ID header
(3) Application specific accounting

I was able to remove pending messages from queue(s) but I'm not sure how to
stop all Exchanges for a job. It seems this can be accomplished using
interceptors and while it's easy to add these interceptors statically I'm
not sure what is the best way to add and remove them dynamically.
Terminating instances of a job would not be common operation so I'd rather
not define interceptors statically.

I'd like to add and remove interceptor dynamically for most, if not all
routes, as I would do for a single route statically:

intercept().when(header("JOB_ID").isEqualTo(JobId)).stop();

I'm also wondering if there's a shortcut to add "global" interceptor that
applies to many if not all route definitions, be it statically or
dynamically?

Appreciate any suggestions
Andre Piwoni


--
View this message in context: http://camel.465427.n5.nabble.com/How-to-stop-all-Exchanges-based-on-criteria-like-header-value-tp4735364p4735364.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: How to stop all Exchanges based on criteria like header value?

Posted by Andre Piwoni <ap...@yahoo.com>.
Don,

Let me re-state your suggestion because I'm not sure I've got it right.

(1) Add Processor that creates header "StopFlag". 

I assume you refer to Processor that handles/consumes requests to stop
specific instance of a job based on desired conditions and not the
interceptor processor wrapped in InterceptStrategy?

(2) Add when(header("StopFlag").isNotNull()).stop() to my route
definition(s). I also assume that you don't mean using "when" predicate with
route interceptor?

If my assumptions are correct than this solution aims at handling various
stop conditions but deferring actual termination of Exchange at the route
level because not every route may need to stop Exchange even when stop
condition is included. 

In step (2) I still would have to know specific JOB_ID because I need to
stop one of many Exchanges going through the route. This could happen with
bean for "when" predicate that evaluates dynamic criteria, not just
"StopFlag", but this means that Processor in step 1 has to share this info
with such bean.

In all, it seems that in order to terminate processing of an exchange as
soon as possible I need to use either InterceptStrategy or interceptor for
route definition; otherwise,  as I mentioned earlier, there's no guarantee
that termination request occurs before "regular" (non-interceptor) predicate
is evaluated.

I guess I can implement some terminate exchange InterceptStrategy with a
List of Predicate conditions that can be added and removed dynamically and
have InterceptStrategy implement Processor that consumes requests for
termination etc. and manages conditions. Alternatively, I could implement
all this with interceptor for route in step 2.

Thanks,
Andre

--
View this message in context: http://camel.465427.n5.nabble.com/How-to-stop-all-Exchanges-based-on-criteria-like-header-value-tp4735364p4748263.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: How to stop all Exchanges based on criteria like header value?

Posted by Donald Whytock <dw...@gmail.com>.
You could have a Processor that adds a header "StopFlag" under
whatever conditions are needed, and then a
when(header("StopFlag").isNotNull()).stop().  The Processor can
evaluate dynamic criteria, such as a particular value for a header, or
it can even be a DesignateProcessor that holds a no-op unless
something comes up.

Don

On Fri, Aug 26, 2011 at 12:16 PM, Andre Piwoni <ap...@yahoo.com> wrote:
> Claus,
>
> I should be able to add InterceptStrategy to CamelContext and set
> Exchange.ROUTE_STOP if that's the only way.
>
> Thanks,
> Andre
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/How-to-stop-all-Exchanges-based-on-criteria-like-header-value-tp4735364p4738533.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>

Re: How to stop all Exchanges based on criteria like header value?

Posted by Andre Piwoni <ap...@yahoo.com>.
Claus,

I should be able to add InterceptStrategy to CamelContext and set
Exchange.ROUTE_STOP if that's the only way.

Thanks,
Andre

--
View this message in context: http://camel.465427.n5.nabble.com/How-to-stop-all-Exchanges-based-on-criteria-like-header-value-tp4735364p4738533.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: How to stop all Exchanges based on criteria like header value?

Posted by Claus Ibsen <cl...@gmail.com>.
You could possible use an context scoped interceptor and then mark the
exchange to stop if the job is to be stopped.
See the source code of the StopProcessor how to mark the exchange to stop.


On Fri, Aug 26, 2011 at 1:07 AM, Andre Piwoni <ap...@yahoo.com> wrote:
> Ashwin,
>
> Thank you for your response.
>
> If I understand you right, you suggest using approaches similar to the ones
> below:
>
> from(uriA).filter(not(TerminatedJobPredicate)).to(uriB) or processor
> from(uriA).process(FilteringProcessor).to(uriB)
>
> It may not be that big of an issue for me to include filter or filtering
> processor statically, even though it would be nice to alter existing route
> definitions, be it adding/removing processors, filters or interceptors.
>
> What if I have to change my filtering logic dynamically based on varying set
> of criteria? Would I be able to design some compound predicate where I could
> add/remove predicates on-the-fly without updating route definition? If
> that's possible than it would be more flexible solution.
>
> My problem with filter and filtering processor approaches, just like with
> interceptor approach, is that
> I have to add filter or filtering processor to all route definitions that I
> want to filter. There does not seem to be a way to apply it to all route
> definitions with one shot.
>
> The biggest problem that I have with using filter and filtering processor I
> can illustrate based on example below:
>
> from(uriA).filter(not(TerminatedJobPredicate)).process(ProcessorA).process(LongRunningProcessorB).to(uriB)
>
> Let's say I terminate JOB_ID=100 while executing ProcessorA. With filter and
> filtering processor approach, unlike with interceptor, LongRunningProcessorB
> would execute and this is exactly what I'm trying to avoid.
> This is one of the reasons I have looked at interceptor because it would
> allow me to terminate LongRunningProcessorB for JOB_ID=100. So I understand
> your argument that my method maybe too involved and complex but I disagree
> that it is unnecessary :-) Ultimately, I'm trying to terminate costly
> processing as soon as possible.
>
> Cheers,
> Andre
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/How-to-stop-all-Exchanges-based-on-criteria-like-header-value-tp4735364p4736289.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: How to stop all Exchanges based on criteria like header value?

Posted by Andre Piwoni <ap...@yahoo.com>.
Ashwin,

Thank you for your response.

If I understand you right, you suggest using approaches similar to the ones
below:

from(uriA).filter(not(TerminatedJobPredicate)).to(uriB) or processor
from(uriA).process(FilteringProcessor).to(uriB)

It may not be that big of an issue for me to include filter or filtering
processor statically, even though it would be nice to alter existing route
definitions, be it adding/removing processors, filters or interceptors.

What if I have to change my filtering logic dynamically based on varying set
of criteria? Would I be able to design some compound predicate where I could
add/remove predicates on-the-fly without updating route definition? If
that's possible than it would be more flexible solution.

My problem with filter and filtering processor approaches, just like with
interceptor approach, is that 
I have to add filter or filtering processor to all route definitions that I
want to filter. There does not seem to be a way to apply it to all route
definitions with one shot.

The biggest problem that I have with using filter and filtering processor I
can illustrate based on example below:

from(uriA).filter(not(TerminatedJobPredicate)).process(ProcessorA).process(LongRunningProcessorB).to(uriB)

Let's say I terminate JOB_ID=100 while executing ProcessorA. With filter and
filtering processor approach, unlike with interceptor, LongRunningProcessorB
would execute and this is exactly what I'm trying to avoid.
This is one of the reasons I have looked at interceptor because it would
allow me to terminate LongRunningProcessorB for JOB_ID=100. So I understand
your argument that my method maybe too involved and complex but I disagree
that it is unnecessary :-) Ultimately, I'm trying to terminate costly
processing as soon as possible.

Cheers,
Andre



--
View this message in context: http://camel.465427.n5.nabble.com/How-to-stop-all-Exchanges-based-on-criteria-like-header-value-tp4735364p4736289.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: How to stop all Exchanges based on criteria like header value?

Posted by Ashwin Karpe <ak...@fusesource.com>.
Hi,

Hmm, can you not use a filter to filter out messages with a given Job Id.
Instead of coming up with very complex processing, a simple processor/bean
that filters out messages and pushes other messages to a sub-route for
downstream jobs/processes using a ProducerTemplate is a simple and elegant
way to do this. 

You can do whatever you like in your processor/bean and maybe send the
messages with a given JobId to a dead letter queue or handle them another
way. 

The methods you propose in your message seem way too involved, unnecessary
and complex.

Cheers,

Ashwin...


-----
---------------------------------------------------------
Ashwin Karpe
Apache Camel Committer & Sr Principal Consultant
FUSESource (a Progress Software Corporation subsidiary)
http://fusesource.com 

Blog: http://opensourceknowledge.blogspot.com 
---------------------------------------------------------
--
View this message in context: http://camel.465427.n5.nabble.com/How-to-stop-all-Exchanges-based-on-criteria-like-header-value-tp4735364p4735508.html
Sent from the Camel - Users mailing list archive at Nabble.com.