You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by blove319 <bl...@hotmail.com> on 2017/04/26 16:18:57 UTC

SJMS RejectedExecutionException bug during shutdown

Camel versions tested: 2.16 - 2.18.3
Current Maven dependencies:
org.apache.camel:camel-test 2.18.1
org.apache.camel:camel-sjms 2.17.3

SHORT VERSION:
When using SJMS (et al?) with an aggregator and/or splitter in the route,
shutting down either throws an error or tosses out messages.


LONG VERSION:

When using an SJMS consumer to consume from a queue, with a route that has
an aggregator in it, I inevitably lose messages when the route stops.

The two obvious documented aggregator modifiers do not work:

*forceCompletionOnStop* - results in a RejectedExecutionException error
because the underlying thread pools are stopped/closed before the
"prepareShutdown" method is called on the aggregator (which is when the
outstanding aggregations are forced to complete and the results are handed
to the route for processing).

*completeAllOnStop* - results in the route logging the number of outstanding
messages every second (the number never changes) until the (500 second?)
timeout is reached, at which point the route is forced to shut down and the
messages are tossed out. Presumably because there is no active thread pool
available to handle the messages.

Without either of these two modifiers on the aggregator, it just tosses out
any unfinished aggregations on shutdown.

Here's a sample test... It probably isn't ideally written, but it does
illustrate the issue...


import org.apache.activemq.junit.EmbeddedActiveMQBroker;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.sjms.SjmsComponent;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.util.toolbox.AggregationStrategies;
import org.junit.Rule;
import org.junit.Test;

/**
 * Created by bryan.love on 4/25/17.
 */
public class SjmsBatchTest {
    @Rule
    public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();
    CamelContext context = new DefaultCamelContext();
    ProducerTemplate template = context.createProducerTemplate();

    @Test
    public void testBatch() throws Exception {
        SjmsComponent comp = new SjmsComponent();
        comp.setConnectionFactory(broker.createConnectionFactory());
        context.addComponent("sjms", comp);
        //context.setShutdownStrategy(new MyShutdownStrategy(context));

        RouteBuilder rb = new RouteBuilder() {
            @Override
            public void configure() throws Exception {
                from("sjms:queue:test-in")
                .aggregate(header("CamelFileName"),
AggregationStrategies.groupedExchange())
                    .id("fileNameAggProcessor")
                    .completionInterval(10000)   // wait $b
                    .completionSize(50)          // wait for $batchSize
messages to aggregate
                    .forceCompletionOnStop()
                .filter(header("CamelFileName").isNotNull())
                .process(new Processor() {
                    @Override
                    public void process(Exchange exchange) throws Exception
{
                        System.out.println("foo");
                    }
                });
            }
        };
        context.addRoutes(rb);

        context.start();
        template.setDefaultEndpointUri("sjms:queue:test-in");
        template.sendBodyAndHeader("some body", "CamelFileName",
"someFileName");
        Thread.sleep(1000);
        context.stop();
    }
}




--
View this message in context: http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335.html
Sent from the Camel Development mailing list archive at Nabble.com.

Re: SJMS RejectedExecutionException bug during shutdown

Posted by blove319 <bl...@hotmail.com>.
I know I've used that off and on in various tests... but i'm not sure if I tried explicitly in this case.  I will try it.


________________________________
From: Quinn Stevenson [via Camel] <ml...@n5.nabble.com>
Sent: Saturday, April 29, 2017 8:04 AM
To: blove319
Subject: Re: SJMS RejectedExecutionException bug during shutdown

Have you tried using "transacted=true" in the from uri?


> On Apr 27, 2017, at 7:18 AM, Claus Ibsen <[hidden email]</user/SendEmail.jtp?type=node&node=5798514&i=0>> wrote:
>
> Hi
>
> Can you log a JIRA, we need to take a look at that shutdown so the
> thread pools are stopped after its done its work.
>
>> On Wed, Apr 26, 2017 at 6:18 PM, blove319 <[hidden email]</user/SendEmail.jtp?type=node&node=5798514&i=1>> wrote:
>> Camel versions tested: 2.16 - 2.18.3
>> Current Maven dependencies:
>> org.apache.camel:camel-test 2.18.1
>> org.apache.camel:camel-sjms 2.17.3
>>
>> SHORT VERSION:
>> When using SJMS (et al?) with an aggregator and/or splitter in the route,
>> shutting down either throws an error or tosses out messages.
>>
>>
>> LONG VERSION:
>>
>> When using an SJMS consumer to consume from a queue, with a route that has
>> an aggregator in it, I inevitably lose messages when the route stops.
>>
>> The two obvious documented aggregator modifiers do not work:
>>
>> *forceCompletionOnStop* - results in a RejectedExecutionException error
>> because the underlying thread pools are stopped/closed before the
>> "prepareShutdown" method is called on the aggregator (which is when the
>> outstanding aggregations are forced to complete and the results are handed
>> to the route for processing).
>>
>> *completeAllOnStop* - results in the route logging the number of outstanding
>> messages every second (the number never changes) until the (500 second?)
>> timeout is reached, at which point the route is forced to shut down and the
>> messages are tossed out. Presumably because there is no active thread pool
>> available to handle the messages.
>>
>> Without either of these two modifiers on the aggregator, it just tosses out
>> any unfinished aggregations on shutdown.
>>
>> Here's a sample test... It probably isn't ideally written, but it does
>> illustrate the issue...
>>
>>
>> import org.apache.activemq.junit.EmbeddedActiveMQBroker;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.ProducerTemplate;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.component.sjms.SjmsComponent;
>> import org.apache.camel.impl.DefaultCamelContext;
>> import org.apache.camel.util.toolbox.AggregationStrategies;
>> import org.junit.Rule;
>> import org.junit.Test;
>>
>> /**
>> * Created by bryan.love on 4/25/17.
>> */
>> public class SjmsBatchTest {
>>    @Rule
>>    public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();
>>    CamelContext context = new DefaultCamelContext();
>>    ProducerTemplate template = context.createProducerTemplate();
>>
>>    @Test
>>    public void testBatch() throws Exception {
>>        SjmsComponent comp = new SjmsComponent();
>>        comp.setConnectionFactory(broker.createConnectionFactory());
>>        context.addComponent("sjms", comp);
>>        //context.setShutdownStrategy(new MyShutdownStrategy(context));
>>
>>        RouteBuilder rb = new RouteBuilder() {
>>            @Override
>>            public void configure() throws Exception {
>>                from("sjms:queue:test-in")
>>                .aggregate(header("CamelFileName"),
>> AggregationStrategies.groupedExchange())
>>                    .id("fileNameAggProcessor")
>>                    .completionInterval(10000)   // wait $b
>>                    .completionSize(50)          // wait for $batchSize
>> messages to aggregate
>>                    .forceCompletionOnStop()
>>                .filter(header("CamelFileName").isNotNull())
>>                .process(new Processor() {
>>                    @Override
>>                    public void process(Exchange exchange) throws Exception
>> {
>>                        System.out.println("foo");
>>                    }
>>                });
>>            }
>>        };
>>        context.addRoutes(rb);
>>
>>        context.start();
>>        template.setDefaultEndpointUri("sjms:queue:test-in");
>>        template.sendBodyAndHeader("some body", "CamelFileName",
>> "someFileName");
>>        Thread.sleep(1000);
>>        context.stop();
>>    }
>> }
>>
>>
>>
>>
>> --
>> View this message in context: http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335.html
>> Sent from the Camel Development mailing list archive at Nabble.com.
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2


________________________________
If you reply to this email, your message will be added to the discussion below:
http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335p5798514.html
To unsubscribe from SJMS RejectedExecutionException bug during shutdown, click here<http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=5798335&code=YmxvdmUzMTlAaG90bWFpbC5jb218NTc5ODMzNXwtMTkwODEzMDQ2Ng==>.
NAML<http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335p5799246.html
Sent from the Camel Development mailing list archive at Nabble.com.

Re: SJMS RejectedExecutionException bug during shutdown

Posted by qu...@pronoia-solutions.com.
Have you tried using "transacted=true" in the from uri?


> On Apr 27, 2017, at 7:18 AM, Claus Ibsen <cl...@gmail.com> wrote:
> 
> Hi
> 
> Can you log a JIRA, we need to take a look at that shutdown so the
> thread pools are stopped after its done its work.
> 
>> On Wed, Apr 26, 2017 at 6:18 PM, blove319 <bl...@hotmail.com> wrote:
>> Camel versions tested: 2.16 - 2.18.3
>> Current Maven dependencies:
>> org.apache.camel:camel-test 2.18.1
>> org.apache.camel:camel-sjms 2.17.3
>> 
>> SHORT VERSION:
>> When using SJMS (et al?) with an aggregator and/or splitter in the route,
>> shutting down either throws an error or tosses out messages.
>> 
>> 
>> LONG VERSION:
>> 
>> When using an SJMS consumer to consume from a queue, with a route that has
>> an aggregator in it, I inevitably lose messages when the route stops.
>> 
>> The two obvious documented aggregator modifiers do not work:
>> 
>> *forceCompletionOnStop* - results in a RejectedExecutionException error
>> because the underlying thread pools are stopped/closed before the
>> "prepareShutdown" method is called on the aggregator (which is when the
>> outstanding aggregations are forced to complete and the results are handed
>> to the route for processing).
>> 
>> *completeAllOnStop* - results in the route logging the number of outstanding
>> messages every second (the number never changes) until the (500 second?)
>> timeout is reached, at which point the route is forced to shut down and the
>> messages are tossed out. Presumably because there is no active thread pool
>> available to handle the messages.
>> 
>> Without either of these two modifiers on the aggregator, it just tosses out
>> any unfinished aggregations on shutdown.
>> 
>> Here's a sample test... It probably isn't ideally written, but it does
>> illustrate the issue...
>> 
>> 
>> import org.apache.activemq.junit.EmbeddedActiveMQBroker;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.ProducerTemplate;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.component.sjms.SjmsComponent;
>> import org.apache.camel.impl.DefaultCamelContext;
>> import org.apache.camel.util.toolbox.AggregationStrategies;
>> import org.junit.Rule;
>> import org.junit.Test;
>> 
>> /**
>> * Created by bryan.love on 4/25/17.
>> */
>> public class SjmsBatchTest {
>>    @Rule
>>    public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();
>>    CamelContext context = new DefaultCamelContext();
>>    ProducerTemplate template = context.createProducerTemplate();
>> 
>>    @Test
>>    public void testBatch() throws Exception {
>>        SjmsComponent comp = new SjmsComponent();
>>        comp.setConnectionFactory(broker.createConnectionFactory());
>>        context.addComponent("sjms", comp);
>>        //context.setShutdownStrategy(new MyShutdownStrategy(context));
>> 
>>        RouteBuilder rb = new RouteBuilder() {
>>            @Override
>>            public void configure() throws Exception {
>>                from("sjms:queue:test-in")
>>                .aggregate(header("CamelFileName"),
>> AggregationStrategies.groupedExchange())
>>                    .id("fileNameAggProcessor")
>>                    .completionInterval(10000)   // wait $b
>>                    .completionSize(50)          // wait for $batchSize
>> messages to aggregate
>>                    .forceCompletionOnStop()
>>                .filter(header("CamelFileName").isNotNull())
>>                .process(new Processor() {
>>                    @Override
>>                    public void process(Exchange exchange) throws Exception
>> {
>>                        System.out.println("foo");
>>                    }
>>                });
>>            }
>>        };
>>        context.addRoutes(rb);
>> 
>>        context.start();
>>        template.setDefaultEndpointUri("sjms:queue:test-in");
>>        template.sendBodyAndHeader("some body", "CamelFileName",
>> "someFileName");
>>        Thread.sleep(1000);
>>        context.stop();
>>    }
>> }
>> 
>> 
>> 
>> 
>> --
>> View this message in context: http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335.html
>> Sent from the Camel Development mailing list archive at Nabble.com.
> 
> 
> 
> -- 
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2

Re: SJMS RejectedExecutionException bug during shutdown

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

Can you log a JIRA, we need to take a look at that shutdown so the
thread pools are stopped after its done its work.

On Wed, Apr 26, 2017 at 6:18 PM, blove319 <bl...@hotmail.com> wrote:
> Camel versions tested: 2.16 - 2.18.3
> Current Maven dependencies:
> org.apache.camel:camel-test 2.18.1
> org.apache.camel:camel-sjms 2.17.3
>
> SHORT VERSION:
> When using SJMS (et al?) with an aggregator and/or splitter in the route,
> shutting down either throws an error or tosses out messages.
>
>
> LONG VERSION:
>
> When using an SJMS consumer to consume from a queue, with a route that has
> an aggregator in it, I inevitably lose messages when the route stops.
>
> The two obvious documented aggregator modifiers do not work:
>
> *forceCompletionOnStop* - results in a RejectedExecutionException error
> because the underlying thread pools are stopped/closed before the
> "prepareShutdown" method is called on the aggregator (which is when the
> outstanding aggregations are forced to complete and the results are handed
> to the route for processing).
>
> *completeAllOnStop* - results in the route logging the number of outstanding
> messages every second (the number never changes) until the (500 second?)
> timeout is reached, at which point the route is forced to shut down and the
> messages are tossed out. Presumably because there is no active thread pool
> available to handle the messages.
>
> Without either of these two modifiers on the aggregator, it just tosses out
> any unfinished aggregations on shutdown.
>
> Here's a sample test... It probably isn't ideally written, but it does
> illustrate the issue...
>
>
> import org.apache.activemq.junit.EmbeddedActiveMQBroker;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.component.sjms.SjmsComponent;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.apache.camel.util.toolbox.AggregationStrategies;
> import org.junit.Rule;
> import org.junit.Test;
>
> /**
>  * Created by bryan.love on 4/25/17.
>  */
> public class SjmsBatchTest {
>     @Rule
>     public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();
>     CamelContext context = new DefaultCamelContext();
>     ProducerTemplate template = context.createProducerTemplate();
>
>     @Test
>     public void testBatch() throws Exception {
>         SjmsComponent comp = new SjmsComponent();
>         comp.setConnectionFactory(broker.createConnectionFactory());
>         context.addComponent("sjms", comp);
>         //context.setShutdownStrategy(new MyShutdownStrategy(context));
>
>         RouteBuilder rb = new RouteBuilder() {
>             @Override
>             public void configure() throws Exception {
>                 from("sjms:queue:test-in")
>                 .aggregate(header("CamelFileName"),
> AggregationStrategies.groupedExchange())
>                     .id("fileNameAggProcessor")
>                     .completionInterval(10000)   // wait $b
>                     .completionSize(50)          // wait for $batchSize
> messages to aggregate
>                     .forceCompletionOnStop()
>                 .filter(header("CamelFileName").isNotNull())
>                 .process(new Processor() {
>                     @Override
>                     public void process(Exchange exchange) throws Exception
> {
>                         System.out.println("foo");
>                     }
>                 });
>             }
>         };
>         context.addRoutes(rb);
>
>         context.start();
>         template.setDefaultEndpointUri("sjms:queue:test-in");
>         template.sendBodyAndHeader("some body", "CamelFileName",
> "someFileName");
>         Thread.sleep(1000);
>         context.stop();
>     }
> }
>
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335.html
> Sent from the Camel Development mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
http://davsclaus.com @davsclaus
Camel in Action 2: https://www.manning.com/ibsen2