You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@camel.apache.org by blove319 <bl...@hotmail.com> on 2017/05/10 16:53:22 UTC
Re: SJMS RejectedExecutionException bug during shutdown
I know I've used that off and on in various tests... but i'm not sure if I tried explicitly in this case. I will try it.
________________________________
From: Quinn Stevenson [via Camel] <ml...@n5.nabble.com>
Sent: Saturday, April 29, 2017 8:04 AM
To: blove319
Subject: Re: SJMS RejectedExecutionException bug during shutdown
Have you tried using "transacted=true" in the from uri?
> On Apr 27, 2017, at 7:18 AM, Claus Ibsen <[hidden email]</user/SendEmail.jtp?type=node&node=5798514&i=0>> wrote:
>
> Hi
>
> Can you log a JIRA, we need to take a look at that shutdown so the
> thread pools are stopped after its done its work.
>
>> On Wed, Apr 26, 2017 at 6:18 PM, blove319 <[hidden email]</user/SendEmail.jtp?type=node&node=5798514&i=1>> wrote:
>> Camel versions tested: 2.16 - 2.18.3
>> Current Maven dependencies:
>> org.apache.camel:camel-test 2.18.1
>> org.apache.camel:camel-sjms 2.17.3
>>
>> SHORT VERSION:
>> When using SJMS (et al?) with an aggregator and/or splitter in the route,
>> shutting down either throws an error or tosses out messages.
>>
>>
>> LONG VERSION:
>>
>> When using an SJMS consumer to consume from a queue, with a route that has
>> an aggregator in it, I inevitably lose messages when the route stops.
>>
>> The two obvious documented aggregator modifiers do not work:
>>
>> *forceCompletionOnStop* - results in a RejectedExecutionException error
>> because the underlying thread pools are stopped/closed before the
>> "prepareShutdown" method is called on the aggregator (which is when the
>> outstanding aggregations are forced to complete and the results are handed
>> to the route for processing).
>>
>> *completeAllOnStop* - results in the route logging the number of outstanding
>> messages every second (the number never changes) until the (500 second?)
>> timeout is reached, at which point the route is forced to shut down and the
>> messages are tossed out. Presumably because there is no active thread pool
>> available to handle the messages.
>>
>> Without either of these two modifiers on the aggregator, it just tosses out
>> any unfinished aggregations on shutdown.
>>
>> Here's a sample test... It probably isn't ideally written, but it does
>> illustrate the issue...
>>
>>
>> import org.apache.activemq.junit.EmbeddedActiveMQBroker;
>> import org.apache.camel.CamelContext;
>> import org.apache.camel.Exchange;
>> import org.apache.camel.Processor;
>> import org.apache.camel.ProducerTemplate;
>> import org.apache.camel.builder.RouteBuilder;
>> import org.apache.camel.component.sjms.SjmsComponent;
>> import org.apache.camel.impl.DefaultCamelContext;
>> import org.apache.camel.util.toolbox.AggregationStrategies;
>> import org.junit.Rule;
>> import org.junit.Test;
>>
>> /**
>> * Created by bryan.love on 4/25/17.
>> */
>> public class SjmsBatchTest {
>> @Rule
>> public EmbeddedActiveMQBroker broker = new EmbeddedActiveMQBroker();
>> CamelContext context = new DefaultCamelContext();
>> ProducerTemplate template = context.createProducerTemplate();
>>
>> @Test
>> public void testBatch() throws Exception {
>> SjmsComponent comp = new SjmsComponent();
>> comp.setConnectionFactory(broker.createConnectionFactory());
>> context.addComponent("sjms", comp);
>> //context.setShutdownStrategy(new MyShutdownStrategy(context));
>>
>> RouteBuilder rb = new RouteBuilder() {
>> @Override
>> public void configure() throws Exception {
>> from("sjms:queue:test-in")
>> .aggregate(header("CamelFileName"),
>> AggregationStrategies.groupedExchange())
>> .id("fileNameAggProcessor")
>> .completionInterval(10000) // wait $b
>> .completionSize(50) // wait for $batchSize
>> messages to aggregate
>> .forceCompletionOnStop()
>> .filter(header("CamelFileName").isNotNull())
>> .process(new Processor() {
>> @Override
>> public void process(Exchange exchange) throws Exception
>> {
>> System.out.println("foo");
>> }
>> });
>> }
>> };
>> context.addRoutes(rb);
>>
>> context.start();
>> template.setDefaultEndpointUri("sjms:queue:test-in");
>> template.sendBodyAndHeader("some body", "CamelFileName",
>> "someFileName");
>> Thread.sleep(1000);
>> context.stop();
>> }
>> }
>>
>>
>>
>>
>> --
>> View this message in context: http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335.html
>> Sent from the Camel Development mailing list archive at Nabble.com.
>
>
>
> --
> Claus Ibsen
> -----------------
> http://davsclaus.com @davsclaus
> Camel in Action 2: https://www.manning.com/ibsen2
________________________________
If you reply to this email, your message will be added to the discussion below:
http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335p5798514.html
To unsubscribe from SJMS RejectedExecutionException bug during shutdown, click here<http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=5798335&code=YmxvdmUzMTlAaG90bWFpbC5jb218NTc5ODMzNXwtMTkwODEzMDQ2Ng==>.
NAML<http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=macro_viewer&id=instant_html%21nabble%3Aemail.naml&base=nabble.naml.namespaces.BasicNamespace-nabble.view.web.template.NabbleNamespace-nabble.view.web.template.NodeNamespace&breadcrumbs=notify_subscribers%21nabble%3Aemail.naml-instant_emails%21nabble%3Aemail.naml-send_instant_email%21nabble%3Aemail.naml>
--
View this message in context: http://camel.465427.n5.nabble.com/SJMS-RejectedExecutionException-bug-during-shutdown-tp5798335p5799246.html
Sent from the Camel Development mailing list archive at Nabble.com.