You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by "Søren Markert (JIRA)" <ji...@apache.org> on 2011/06/24 13:37:47 UTC

[jira] [Created] (CAMEL-4149) ThrottlingInflightRoutePolicy can deadlock

ThrottlingInflightRoutePolicy can deadlock
------------------------------------------

                 Key: CAMEL-4149
                 URL: https://issues.apache.org/jira/browse/CAMEL-4149
             Project: Camel
          Issue Type: Bug
          Components: camel-core
    Affects Versions: 2.7.0
            Reporter: Søren Markert


Using ThrottlingInflightRoutePolicy can deadlock a route in some situations. The unit test pasted in below shows one such situation.

What happens is that the bottom route processes its first exchange, then suspends. Since it is suspended it will not take the next exchange from the seda queue, and so it will never check whether it should re-enable the route.

Perhaps it will work by putting the check to re-enable the route in the onExchangeBegin method, if that is called even when the route is suspended?

{code}
import org.apache.camel.Exchange;
import org.apache.camel.Produce;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.apache.camel.impl.DefaultInflightRepository;
import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
import org.apache.camel.impl.ThrottlingInflightRoutePolicy.ThrottlingScope;
import org.apache.camel.test.CamelTestSupport;

public class ThrottleTest extends CamelTestSupport {

	@Produce(uri = "direct:input")
	protected ProducerTemplate input;
	protected MockEndpoint resultEndpoint;

	@Override
	protected RouteBuilder createRouteBuilder() throws Exception {
	    return new RouteBuilder() {
            public void configure() {
            	resultEndpoint = new MockEndpoint("mock:result");
            	resultEndpoint.setCamelContext(getContext());
            	
            	getContext().setInflightRepository(new DefaultInflightRepository() {
            		@Override
            	    public void add(Exchange exchange) {
            			super.add(exchange);
            			System.out.println("                        add " + this.size());
            	    }
            		@Override
           	        public void remove(Exchange exchange) {
            			super.remove(exchange);
            			System.out.println("                     remove " + this.size());
           	        }
            		
            	});
            	
            	ThrottlingInflightRoutePolicy throttler = new ThrottlingInflightRoutePolicy();
            	
            	throttler.setMaxInflightExchanges(1);
            	throttler.setScope(ThrottlingScope.Context);

            	from("direct:input")
            		.inOnly("seda:hey", "seda:hey", "seda:hey", "seda:hey", "seda:hey")
            		.delay(1000)
        		.inOnly("log:inputDone");
            	
            	from("seda:hey")
            		.routePolicy(throttler)
            		.inOut("log:outputDone")
            		.to(resultEndpoint);
            }
        };
	}
	
	public void testThatAllExchangesAreReceived() throws Exception {
		input.sendBody("hello");
		
		resultEndpoint.expectedMessageCount(5);
		resultEndpoint.assertIsSatisfied();
	}
}
{code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Updated] (CAMEL-4149) ThrottlingInflightRoutePolicy can deadlock

Posted by "Claus Ibsen (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen updated CAMEL-4149:
-------------------------------

         Priority: Minor  (was: Major)
       Regression:   (was: [Unit Test Broken])
    Fix Version/s: 2.9.0
                   2.8.0
         Assignee: Claus Ibsen

Its because you use seda endpoints.

> ThrottlingInflightRoutePolicy can deadlock
> ------------------------------------------
>
>                 Key: CAMEL-4149
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4149
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.7.0
>            Reporter: Søren Markert
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.8.0, 2.9.0
>
>
> Using ThrottlingInflightRoutePolicy can deadlock a route in some situations. The unit test pasted in below shows one such situation.
> What happens is that the bottom route processes its first exchange, then suspends. Since it is suspended it will not take the next exchange from the seda queue, and so it will never check whether it should re-enable the route.
> Perhaps it will work by putting the check to re-enable the route in the onExchangeBegin method, if that is called even when the route is suspended?
> {code}
> import org.apache.camel.Exchange;
> import org.apache.camel.Produce;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.impl.DefaultInflightRepository;
> import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
> import org.apache.camel.impl.ThrottlingInflightRoutePolicy.ThrottlingScope;
> import org.apache.camel.test.CamelTestSupport;
> public class ThrottleTest extends CamelTestSupport {
> 	@Produce(uri = "direct:input")
> 	protected ProducerTemplate input;
> 	protected MockEndpoint resultEndpoint;
> 	@Override
> 	protected RouteBuilder createRouteBuilder() throws Exception {
> 	    return new RouteBuilder() {
>             public void configure() {
>             	resultEndpoint = new MockEndpoint("mock:result");
>             	resultEndpoint.setCamelContext(getContext());
>             	
>             	getContext().setInflightRepository(new DefaultInflightRepository() {
>             		@Override
>             	    public void add(Exchange exchange) {
>             			super.add(exchange);
>             			System.out.println("                        add " + this.size());
>             	    }
>             		@Override
>            	        public void remove(Exchange exchange) {
>             			super.remove(exchange);
>             			System.out.println("                     remove " + this.size());
>            	        }
>             		
>             	});
>             	
>             	ThrottlingInflightRoutePolicy throttler = new ThrottlingInflightRoutePolicy();
>             	
>             	throttler.setMaxInflightExchanges(1);
>             	throttler.setScope(ThrottlingScope.Context);
>             	from("direct:input")
>             		.inOnly("seda:hey", "seda:hey", "seda:hey", "seda:hey", "seda:hey")
>             		.delay(1000)
>         		.inOnly("log:inputDone");
>             	
>             	from("seda:hey")
>             		.routePolicy(throttler)
>             		.inOut("log:outputDone")
>             		.to(resultEndpoint);
>             }
>         };
> 	}
> 	
> 	public void testThatAllExchangesAreReceived() throws Exception {
> 		input.sendBody("hello");
> 		
> 		resultEndpoint.expectedMessageCount(5);
> 		resultEndpoint.assertIsSatisfied();
> 	}
> }
> {code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Issue Comment Edited] (CAMEL-4149) ThrottlingInflightRoutePolicy can deadlock

Posted by "Claus Ibsen (JIRA)" <ji...@apache.org>.
    [ https://issues.apache.org/jira/browse/CAMEL-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13054834#comment-13054834 ] 

Claus Ibsen edited comment on CAMEL-4149 at 6/25/11 9:47 AM:
-------------------------------------------------------------

Its because you use seda endpoints, context scoped endpoints and only define the route policy in one of the routes.

      was (Author: davsclaus):
    Its because you use seda endpoints.
  
> ThrottlingInflightRoutePolicy can deadlock
> ------------------------------------------
>
>                 Key: CAMEL-4149
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4149
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.7.0
>            Reporter: Søren Markert
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.8.0, 2.9.0
>
>
> Using ThrottlingInflightRoutePolicy can deadlock a route in some situations. The unit test pasted in below shows one such situation.
> What happens is that the bottom route processes its first exchange, then suspends. Since it is suspended it will not take the next exchange from the seda queue, and so it will never check whether it should re-enable the route.
> Perhaps it will work by putting the check to re-enable the route in the onExchangeBegin method, if that is called even when the route is suspended?
> {code}
> import org.apache.camel.Exchange;
> import org.apache.camel.Produce;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.impl.DefaultInflightRepository;
> import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
> import org.apache.camel.impl.ThrottlingInflightRoutePolicy.ThrottlingScope;
> import org.apache.camel.test.CamelTestSupport;
> public class ThrottleTest extends CamelTestSupport {
> 	@Produce(uri = "direct:input")
> 	protected ProducerTemplate input;
> 	protected MockEndpoint resultEndpoint;
> 	@Override
> 	protected RouteBuilder createRouteBuilder() throws Exception {
> 	    return new RouteBuilder() {
>             public void configure() {
>             	resultEndpoint = new MockEndpoint("mock:result");
>             	resultEndpoint.setCamelContext(getContext());
>             	
>             	getContext().setInflightRepository(new DefaultInflightRepository() {
>             		@Override
>             	    public void add(Exchange exchange) {
>             			super.add(exchange);
>             			System.out.println("                        add " + this.size());
>             	    }
>             		@Override
>            	        public void remove(Exchange exchange) {
>             			super.remove(exchange);
>             			System.out.println("                     remove " + this.size());
>            	        }
>             		
>             	});
>             	
>             	ThrottlingInflightRoutePolicy throttler = new ThrottlingInflightRoutePolicy();
>             	
>             	throttler.setMaxInflightExchanges(1);
>             	throttler.setScope(ThrottlingScope.Context);
>             	from("direct:input")
>             		.inOnly("seda:hey", "seda:hey", "seda:hey", "seda:hey", "seda:hey")
>             		.delay(1000)
>         		.inOnly("log:inputDone");
>             	
>             	from("seda:hey")
>             		.routePolicy(throttler)
>             		.inOut("log:outputDone")
>             		.to(resultEndpoint);
>             }
>         };
> 	}
> 	
> 	public void testThatAllExchangesAreReceived() throws Exception {
> 		input.sendBody("hello");
> 		
> 		resultEndpoint.expectedMessageCount(5);
> 		resultEndpoint.assertIsSatisfied();
> 	}
> }
> {code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Resolved] (CAMEL-4149) ThrottlingInflightRoutePolicy can deadlock

Posted by "Claus Ibsen (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Claus Ibsen resolved CAMEL-4149.
--------------------------------

       Resolution: Fixed
    Fix Version/s:     (was: 2.9.0)

Thanks for reporting. Fixed on trunk.

> ThrottlingInflightRoutePolicy can deadlock
> ------------------------------------------
>
>                 Key: CAMEL-4149
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4149
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.7.0
>            Reporter: Søren Markert
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.8.0
>
>
> Using ThrottlingInflightRoutePolicy can deadlock a route in some situations. The unit test pasted in below shows one such situation.
> What happens is that the bottom route processes its first exchange, then suspends. Since it is suspended it will not take the next exchange from the seda queue, and so it will never check whether it should re-enable the route.
> Perhaps it will work by putting the check to re-enable the route in the onExchangeBegin method, if that is called even when the route is suspended?
> {code}
> import org.apache.camel.Exchange;
> import org.apache.camel.Produce;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.impl.DefaultInflightRepository;
> import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
> import org.apache.camel.impl.ThrottlingInflightRoutePolicy.ThrottlingScope;
> import org.apache.camel.test.CamelTestSupport;
> public class ThrottleTest extends CamelTestSupport {
> 	@Produce(uri = "direct:input")
> 	protected ProducerTemplate input;
> 	protected MockEndpoint resultEndpoint;
> 	@Override
> 	protected RouteBuilder createRouteBuilder() throws Exception {
> 	    return new RouteBuilder() {
>             public void configure() {
>             	resultEndpoint = new MockEndpoint("mock:result");
>             	resultEndpoint.setCamelContext(getContext());
>             	
>             	getContext().setInflightRepository(new DefaultInflightRepository() {
>             		@Override
>             	    public void add(Exchange exchange) {
>             			super.add(exchange);
>             			System.out.println("                        add " + this.size());
>             	    }
>             		@Override
>            	        public void remove(Exchange exchange) {
>             			super.remove(exchange);
>             			System.out.println("                     remove " + this.size());
>            	        }
>             		
>             	});
>             	
>             	ThrottlingInflightRoutePolicy throttler = new ThrottlingInflightRoutePolicy();
>             	
>             	throttler.setMaxInflightExchanges(1);
>             	throttler.setScope(ThrottlingScope.Context);
>             	from("direct:input")
>             		.inOnly("seda:hey", "seda:hey", "seda:hey", "seda:hey", "seda:hey")
>             		.delay(1000)
>         		.inOnly("log:inputDone");
>             	
>             	from("seda:hey")
>             		.routePolicy(throttler)
>             		.inOut("log:outputDone")
>             		.to(resultEndpoint);
>             }
>         };
> 	}
> 	
> 	public void testThatAllExchangesAreReceived() throws Exception {
> 		input.sendBody("hello");
> 		
> 		resultEndpoint.expectedMessageCount(5);
> 		resultEndpoint.assertIsSatisfied();
> 	}
> }
> {code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira

       

[jira] [Updated] (CAMEL-4149) ThrottlingInflightRoutePolicy can deadlock

Posted by "Daniel Kulp (JIRA)" <ji...@apache.org>.
     [ https://issues.apache.org/jira/browse/CAMEL-4149?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Daniel Kulp updated CAMEL-4149:
-------------------------------

    Fix Version/s: 2.7.3

> ThrottlingInflightRoutePolicy can deadlock
> ------------------------------------------
>
>                 Key: CAMEL-4149
>                 URL: https://issues.apache.org/jira/browse/CAMEL-4149
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.7.0
>            Reporter: Søren Markert
>            Assignee: Claus Ibsen
>            Priority: Minor
>             Fix For: 2.7.3, 2.8.0
>
>
> Using ThrottlingInflightRoutePolicy can deadlock a route in some situations. The unit test pasted in below shows one such situation.
> What happens is that the bottom route processes its first exchange, then suspends. Since it is suspended it will not take the next exchange from the seda queue, and so it will never check whether it should re-enable the route.
> Perhaps it will work by putting the check to re-enable the route in the onExchangeBegin method, if that is called even when the route is suspended?
> {code}
> import org.apache.camel.Exchange;
> import org.apache.camel.Produce;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.mock.MockEndpoint;
> import org.apache.camel.impl.DefaultInflightRepository;
> import org.apache.camel.impl.ThrottlingInflightRoutePolicy;
> import org.apache.camel.impl.ThrottlingInflightRoutePolicy.ThrottlingScope;
> import org.apache.camel.test.CamelTestSupport;
> public class ThrottleTest extends CamelTestSupport {
> 	@Produce(uri = "direct:input")
> 	protected ProducerTemplate input;
> 	protected MockEndpoint resultEndpoint;
> 	@Override
> 	protected RouteBuilder createRouteBuilder() throws Exception {
> 	    return new RouteBuilder() {
>             public void configure() {
>             	resultEndpoint = new MockEndpoint("mock:result");
>             	resultEndpoint.setCamelContext(getContext());
>             	
>             	getContext().setInflightRepository(new DefaultInflightRepository() {
>             		@Override
>             	    public void add(Exchange exchange) {
>             			super.add(exchange);
>             			System.out.println("                        add " + this.size());
>             	    }
>             		@Override
>            	        public void remove(Exchange exchange) {
>             			super.remove(exchange);
>             			System.out.println("                     remove " + this.size());
>            	        }
>             		
>             	});
>             	
>             	ThrottlingInflightRoutePolicy throttler = new ThrottlingInflightRoutePolicy();
>             	
>             	throttler.setMaxInflightExchanges(1);
>             	throttler.setScope(ThrottlingScope.Context);
>             	from("direct:input")
>             		.inOnly("seda:hey", "seda:hey", "seda:hey", "seda:hey", "seda:hey")
>             		.delay(1000)
>         		.inOnly("log:inputDone");
>             	
>             	from("seda:hey")
>             		.routePolicy(throttler)
>             		.inOut("log:outputDone")
>             		.to(resultEndpoint);
>             }
>         };
> 	}
> 	
> 	public void testThatAllExchangesAreReceived() throws Exception {
> 		input.sendBody("hello");
> 		
> 		resultEndpoint.expectedMessageCount(5);
> 		resultEndpoint.assertIsSatisfied();
> 	}
> }
> {code}

--
This message is automatically generated by JIRA.
For more information on JIRA, see: http://www.atlassian.com/software/jira