You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by mdo <ma...@gmail.com> on 2013/03/18 15:21:01 UTC

stopRoute(): Waiting as there are still X inflight and pending exchanges to complete

Hello,

I'm trying to stop a route from a processor. The route is Quartz-triggered
(via context.startRoute()). The route in question is defined like this:

        from("file:///tmp/x/?sendEmptyMessageWhenIdle=true")
                .noAutoStartup()
                .routeId(nameConsumer)
                .choice()
                    .when(body().isNull()).log("stopping route")
                        .process(
                            new Processor() {
                                @Override
                                public void process(final Exchange exchange)
throws Exception {                                   
                                   
getContext().getInflightRepository().remove(exchange);
                                    // try to stop the consumer, the polling
field of the ScheduledBatchPollingConsumer must be false? This doesn't work:
                                   
getContext().getRoute(exchange.getFromRouteId()).getConsumer().stop();
                                    // Remove the route entry from the
inflight repo -- this brings the pending size down by 1:
                                    getContext().getInflightRepository()
                                       
.removeRoute(exchange.getFromRouteId());
                                   
getContext().stopRoute(exchange.getFromRouteId());
                                }
                            }
                        )
                    .otherwise().to("seda:" + nameSeda +
"?waitForTaskToComplete=Never");


The idea was to use empty messages to signal when there are no more files to
process (sendEmptyMessageWhenIdle).

Problem is if I only remove(exchange) and
stopRoute(exchange.getFromRouteId()) Camel complains that there are 2
remaining pending exchanges:

[Camel (camel-1) thread #3 - ShutdownTask] INFO
org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2
inflight and pending exchanges to complete, timeout in 299 seconds.

Removing the route entry from the inflight repository brings this number
down by 1. But the other "pending exchange" seems to be there due to the
fact that getPendingExchangesSize in ScheduledBatchPollingConsumer returns 1
all the time if its state is "polling", a comment reads:

            // force at least one pending exchange if we are polling as
there is a little gap
            // in the processBatch method and until an exchange gets
enlisted as in-flight
            // which happens later, so we need to signal back to the
shutdown strategy that
            // there is a pending exchange. When we are no longer polling,
then we will return 0


I've tested with Camel 2.10/2.11. 

Any hints? Thanks in advance.
mdo






--
View this message in context: http://camel.465427.n5.nabble.com/stopRoute-Waiting-as-there-are-still-X-inflight-and-pending-exchanges-to-complete-tp5729348.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: stopRoute(): Waiting as there are still X inflight and pending exchanges to complete

Posted by Claus Ibsen <cl...@gmail.com>.
On Mon, Mar 18, 2013 at 3:34 PM, mdo <ma...@gmail.com> wrote:
> Hello Claus,
>
> thanks for your quick answer!
>
>
> Claus Ibsen-2 wrote
>> See this FAQ how to stop a route from a route
>> http://camel.apache.org/how-can-i-stop-a-route-from-a-route.html
>
> I've seen these examples but is it necessary to start a new thread? I've
> seen examples with the following procedure at a couple of places --
> including the Camel book:
>

Yes and that is why the FAQ is updated to show the best solution.
The source code for CiA has also been updated with a better example to
do like the FAQ


>
>> .process(new Processor() {
>> public void process(Exchange exchange) throws Exception {
>>     exchange.getContext().getInflightRepository().remove(exchange);
>>     exchange.getContext().stopRoute("manual");
>> }
>> });
>
>
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/stopRoute-Waiting-as-there-are-still-X-inflight-and-pending-exchanges-to-complete-tp5729348p5729351.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
FuseSource is now part of Red Hat
Email: cibsen@redhat.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: stopRoute(): Waiting as there are still X inflight and pending exchanges to complete

Posted by mdo <ma...@gmail.com>.
Hello Claus, 

thanks for your quick answer!


Claus Ibsen-2 wrote
> See this FAQ how to stop a route from a route
> http://camel.apache.org/how-can-i-stop-a-route-from-a-route.html

I've seen these examples but is it necessary to start a new thread? I've
seen examples with the following procedure at a couple of places --
including the Camel book:


> .process(new Processor() {
> public void process(Exchange exchange) throws Exception {
>     exchange.getContext().getInflightRepository().remove(exchange);
>     exchange.getContext().stopRoute("manual");
> }
> });






--
View this message in context: http://camel.465427.n5.nabble.com/stopRoute-Waiting-as-there-are-still-X-inflight-and-pending-exchanges-to-complete-tp5729348p5729351.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: stopRoute(): Waiting as there are still X inflight and pending exchanges to complete

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

See this FAQ how to stop a route from a route
http://camel.apache.org/how-can-i-stop-a-route-from-a-route.html

On Mon, Mar 18, 2013 at 3:21 PM, mdo <ma...@gmail.com> wrote:
> Hello,
>
> I'm trying to stop a route from a processor. The route is Quartz-triggered
> (via context.startRoute()). The route in question is defined like this:
>
>         from("file:///tmp/x/?sendEmptyMessageWhenIdle=true")
>                 .noAutoStartup()
>                 .routeId(nameConsumer)
>                 .choice()
>                     .when(body().isNull()).log("stopping route")
>                         .process(
>                             new Processor() {
>                                 @Override
>                                 public void process(final Exchange exchange)
> throws Exception {
>
> getContext().getInflightRepository().remove(exchange);
>                                     // try to stop the consumer, the polling
> field of the ScheduledBatchPollingConsumer must be false? This doesn't work:
>
> getContext().getRoute(exchange.getFromRouteId()).getConsumer().stop();
>                                     // Remove the route entry from the
> inflight repo -- this brings the pending size down by 1:
>                                     getContext().getInflightRepository()
>
> .removeRoute(exchange.getFromRouteId());
>
> getContext().stopRoute(exchange.getFromRouteId());
>                                 }
>                             }
>                         )
>                     .otherwise().to("seda:" + nameSeda +
> "?waitForTaskToComplete=Never");
>
>
> The idea was to use empty messages to signal when there are no more files to
> process (sendEmptyMessageWhenIdle).
>
> Problem is if I only remove(exchange) and
> stopRoute(exchange.getFromRouteId()) Camel complains that there are 2
> remaining pending exchanges:
>
> [Camel (camel-1) thread #3 - ShutdownTask] INFO
> org.apache.camel.impl.DefaultShutdownStrategy - Waiting as there are still 2
> inflight and pending exchanges to complete, timeout in 299 seconds.
>
> Removing the route entry from the inflight repository brings this number
> down by 1. But the other "pending exchange" seems to be there due to the
> fact that getPendingExchangesSize in ScheduledBatchPollingConsumer returns 1
> all the time if its state is "polling", a comment reads:
>
>             // force at least one pending exchange if we are polling as
> there is a little gap
>             // in the processBatch method and until an exchange gets
> enlisted as in-flight
>             // which happens later, so we need to signal back to the
> shutdown strategy that
>             // there is a pending exchange. When we are no longer polling,
> then we will return 0
>
>
> I've tested with Camel 2.10/2.11.
>
> Any hints? Thanks in advance.
> mdo
>
>
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/stopRoute-Waiting-as-there-are-still-X-inflight-and-pending-exchanges-to-complete-tp5729348.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
FuseSource is now part of Red Hat
Email: cibsen@redhat.com
Web: http://fusesource.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen