You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by blove319 <bl...@hotmail.com> on 2017/05/10 16:53:22 UTC

Re: SJMS RejectedExecutionException bug during shutdown

I know I've used that off and on in various tests... but i'm not sure if I tried explicitly in this case.  I will try it.


________________________________
From: Quinn Stevenson [via Camel] <ml...@n5.nabble.com>
Sent: Saturday, April 29, 2017 8:04 AM
To: blove319
Subject: Re: SJMS RejectedExecutionException bug during shutdown

Have you tried using "transacted=true" in the from uri?


> On Apr 27, 2017, at 7:18 AM, Claus Ibsen <[hidden email]</user/SendEmail.jtp?type=node&node=5798514&i=0>> wrote:
>
> Hi
>
> Can you log a JIRA, we need to take a look at that shutdown so the
> thread pools are stopped after its done its work.
>
>> On Wed, Apr 26, 2017 at 6:18 PM, blove319 <[hidden email]</user/SendEmail.jtp?type=node&node=5798514&i=1>> wrote:
>> Camel versions tested: 2.16 - 2.18.3
>> Current Maven dependencies:
>> org.apache.camel:camel-test 2.18.1
>> org.apache.camel:camel-sjms 2.17.3
>>
>> SHORT VERSION:
>> When using SJMS (et al?) with an aggregator and/or splitter in the route,
>> shutting down either throws an error or tosses out messages.
>>
>>
>> LONG VERSION:
>>
>> When using an SJMS consumer to consume from a queue, with a route that has
>> an aggregator in it, I inevitably lose messages when the route stops.
>>
>> The two obvious documented aggregator modifiers do not work:
>>
>> *forceCompletionOnStop* - results in a RejectedExecutionException error
>> because the underlying thread pools are stopped/closed before the
>> "prepareShutdown" method is called on the aggregator (which is when the
>> outstanding aggregations are forced to complete and the results are handed
>> to the route for processing).
>>
>> *completeAllOnStop* - results in the route logging the number of outstanding
>> messages every second (the number never changes) until the (500 second?)
>> timeout is reached, at which point the route is forced to shut down and the
>> messages are tossed out. Presumably because there is no active thread pool
>> available to handle the messages.
>>
>> Without either of these two modifiers on the aggregator, it just tosses out
>> any unfinished aggregations on shutdown.
>>
>> Here's a sample test... It probably isn't ideally written, but it does
>> illustrate the issue...
>>
>>
>> import org.apache.activemq.junit.EmbeddedActiveMQBroker;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.ProducerTemplate;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.component.sjms.SjmsComponent;
>> import org.apache.camel.impl.DefaultCamelContext;
>> import org.apache.camel.util.toolbox.AggregationStrategies;
>> import org.junit.Rule;
>> import org.junit.Test;
>>
>> /**
>> * Created by bryan.love on 4/25/17.
>> */
>> public class SjmsBatchTest {
>>    @Rule
>>    public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();
>>    CamelContext context = new DefaultCamelContext();
>>    ProducerTemplate template = context.createProducerTemplate();
>>
>>    @Test
>>    public void testBatch() throws Exception {
>>        SjmsComponent comp = new SjmsComponent();
>>        comp.setConnectionFactory(broker.createConnectionFactory());
>>        context.addComponent("sjms", comp);
>>        //context.setShutdownStrategy(new MyShutdownStrategy(context));
>>
>>        RouteBuilder rb = new RouteBuilder() {
>>            @Override
>>            public void configure() throws Exception {
>>                from("sjms:queue:test-in")
>>                .aggregate(header("CamelFileName"),
>> AggregationStrategies.groupedExchange())
>>                    .id("fileNameAggProcessor")
>>                    .completionInterval(10000)   // wait $b
>>                    .completionSize(50)          // wait for $batchSize
>> messages to aggregate
>>                    .forceCompletionOnStop()
>>                .filter(header("CamelFileName").isNotNull())
>>                .process(new Processor() {
>>                    @Override
>>                    public void process(Exchange exchange) throws Exception
>> {
>>                        System.out.println("foo");
>>                    }
>>                });
>>            }
>>        };
>>        context.addRoutes(rb);
>>
>>        context.start();
>>        template.setDefaultEndpointUri("sjms:queue:test-in");
>>        template.sendBodyAndHeader("some body", "CamelFileName",
>> "someFileName");
>>        Thread.sleep(1000);
>>        context.stop();
>>    }
>> }
>>
>>
>>
>>
>> --
>> View this message in context: http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335.html
>> Sent from the Camel Development mailing list archive at Nabble.com.
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2


________________________________
If you reply to this email, your message will be added to the discussion below:
http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335p5798514.html
To unsubscribe from SJMS RejectedExecutionException bug during shutdown, click here<http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=5798335&code=YmxvdmUzMTlAaG90bWFpbC5jb218NTc5ODMzNXwtMTkwODEzMDQ2Ng==>.
NAML<http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>




--
View this message in context: http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335p5799246.html
Sent from the Camel Development mailing list archive at Nabble.com.