You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Aida <ai...@gmail.com> on 2013/08/20 10:08:18 UTC

[Splitter] Problem with stopOnException when using threadPoolExecutor

Hi,

I'm working with a little proof of concept because I'm having trouble
managing exceptions in splitter when I combine stopOnException and the usage
of a threadPoolExecutor.

I have been working with documentation, and Camel In Action book and source
examples but I'm not able to solve this.

My route is this one:

//Error handling
onException(Throwable.class)
	.to(URI_STOP_WHEN_EXCEPTION);

from(URI_START_PROCESSING)

.split(body()).aggregationStrategy(myAggregationStrategy).executorService(threadPoolExecutor).stopOnException()
		.process(new Processor() {
			@Override
			public void process(Exchange exchange) throws Exception {
				if( (":)".equals(exchange.getIn().getBody(String.class)))  ||  ("world
".equals(exchange.getIn().getBody(String.class))) ){
					throw new RuntimeCamelException();
				}
			}
		})
	.end()
		.to(URI_STOP_WITHOUT_EXCEPTION);


What I expected to happen is that, when an error occurs in the splitter,
only one (and no more) messages arrive to URI_STOP_WHEN_EXCEPTION.

If I send a body like: 
List<String> body = Arrays.asList("Hello ", "world ", ":)");

The result of this is that "world " and  ":)" arrive to
URI_STOP_WHEN_EXCEPTION (these are the bodies that the processor inside the
splitter will use to throw an exception).

If I don´t use a threadPoolExecutor only one error happens in the same
instant, so I have the expected behaviour, but using the threadPoolExecutor
(size 10) happens what I explained.

At this point I thought that maybe using the aggregationStrategy to
propagate back the exception was a good alternative. I removed the
stopOnException() and used the "MyPropagateFailureAggregationStrategy" that
appears in the book of Camel In Action (page 265, I don´t know if I'm
allowed to paste the source code). So my route now looks like:

//Error handling
onException(Throwable.class)
	.to(URI_STOP_WHEN_EXCEPTION);

from(URI_START_PROCESSING)

.split(body()).aggregationStrategy(myPropagateFailureAggregationStrategy).executorService(threadPoolExecutor)
		.process(new Processor() {
			@Override
			public void process(Exchange exchange) throws Exception {
				if( (":)".equals(exchange.getIn().getBody(String.class)))  ||  ("world
".equals(exchange.getIn().getBody(String.class))) ){
					throw new RuntimeCamelException();
				}
			}
		})
	.end()
		.to(URI_STOP_WITHOUT_EXCEPTION);


Nevertheless the result is the same in this case ... I don´t know if the
paralellism could be a problem, I suppose that I'm missing something
important

I'm working with Camel 2.10.4

Thanks in advance for your time.

  Aida.



--
View this message in context: http://camel.465427.n5.nabble.com/Splitter-Problem-with-stopOnException-when-using-threadPoolExecutor-tp5737562.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: [Splitter] Problem with stopOnException when using threadPoolExecutor

Posted by Claus Ibsen <cl...@gmail.com>.
Yeah the aggregate method in AggregationStrategy is thread-safe, and
its also invoked in the "order", eg first splitted message, 2nd
splitted message, ... N splitted message.

Though stop on exception can still make a little sense if the data you
split is bigger than the thread pool size + queue size, in case some
exception occurred, then the splitter wont have to split all messages,
for then to fail after that.

On Tue, Aug 20, 2013 at 11:19 AM, Aida <ai...@gmail.com> wrote:
> Hi Claus,
>
> Thank you for your response, it makes sense. I suppose than then the right
> way to go would be use the aggregationStrategy to propagate back the
> exception. As in this case I have the same behaviour and only for checking:
> threadPool shouldn´t interfere in this case, right?
>
>
> Thanks.
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Splitter-Problem-with-stopOnException-when-using-threadPoolExecutor-tp5737562p5737570.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen

Re: [Splitter] Problem with stopOnException when using threadPoolExecutor

Posted by Aida <ai...@gmail.com>.
Hi Claus,

Thank you for your response, it makes sense. I suppose than then the right
way to go would be use the aggregationStrategy to propagate back the
exception. As in this case I have the same behaviour and only for checking:
threadPool shouldn´t interfere in this case, right?


Thanks.



--
View this message in context: http://camel.465427.n5.nabble.com/Splitter-Problem-with-stopOnException-when-using-threadPoolExecutor-tp5737562p5737570.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: [Splitter] Problem with stopOnException when using threadPoolExecutor

Posted by Claus Ibsen <cl...@gmail.com>.
When you use a thread pool then paralllel work happens and the order
they are processed can be anyhow the JDK thread pool executes them.

If you are not using thread pool then the order is sequential and only
1 work at a time.

So with thread pool and stop on exception then the JDK may have
already started the other tasks and they cannot be stopped etc. Only
the non-started tasks will be stopped when stopOnException is
triggered.

On Tue, Aug 20, 2013 at 10:08 AM, Aida <ai...@gmail.com> wrote:
> Hi,
>
> I'm working with a little proof of concept because I'm having trouble
> managing exceptions in splitter when I combine stopOnException and the usage
> of a threadPoolExecutor.
>
> I have been working with documentation, and Camel In Action book and source
> examples but I'm not able to solve this.
>
> My route is this one:
>
> //Error handling
> onException(Throwable.class)
>         .to(URI_STOP_WHEN_EXCEPTION);
>
> from(URI_START_PROCESSING)
>
> .split(body()).aggregationStrategy(myAggregationStrategy).executorService(threadPoolExecutor).stopOnException()
>                 .process(new Processor() {
>                         @Override
>                         public void process(Exchange exchange) throws Exception {
>                                 if( (":)".equals(exchange.getIn().getBody(String.class)))  ||  ("world
> ".equals(exchange.getIn().getBody(String.class))) ){
>                                         throw new RuntimeCamelException();
>                                 }
>                         }
>                 })
>         .end()
>                 .to(URI_STOP_WITHOUT_EXCEPTION);
>
>
> What I expected to happen is that, when an error occurs in the splitter,
> only one (and no more) messages arrive to URI_STOP_WHEN_EXCEPTION.
>
> If I send a body like:
> List<String> body = Arrays.asList("Hello ", "world ", ":)");
>
> The result of this is that "world " and  ":)" arrive to
> URI_STOP_WHEN_EXCEPTION (these are the bodies that the processor inside the
> splitter will use to throw an exception).
>
> If I don´t use a threadPoolExecutor only one error happens in the same
> instant, so I have the expected behaviour, but using the threadPoolExecutor
> (size 10) happens what I explained.
>
> At this point I thought that maybe using the aggregationStrategy to
> propagate back the exception was a good alternative. I removed the
> stopOnException() and used the "MyPropagateFailureAggregationStrategy" that
> appears in the book of Camel In Action (page 265, I don´t know if I'm
> allowed to paste the source code). So my route now looks like:
>
> //Error handling
> onException(Throwable.class)
>         .to(URI_STOP_WHEN_EXCEPTION);
>
> from(URI_START_PROCESSING)
>
> .split(body()).aggregationStrategy(myPropagateFailureAggregationStrategy).executorService(threadPoolExecutor)
>                 .process(new Processor() {
>                         @Override
>                         public void process(Exchange exchange) throws Exception {
>                                 if( (":)".equals(exchange.getIn().getBody(String.class)))  ||  ("world
> ".equals(exchange.getIn().getBody(String.class))) ){
>                                         throw new RuntimeCamelException();
>                                 }
>                         }
>                 })
>         .end()
>                 .to(URI_STOP_WITHOUT_EXCEPTION);
>
>
> Nevertheless the result is the same in this case ... I don´t know if the
> paralellism could be a problem, I suppose that I'm missing something
> important
>
> I'm working with Camel 2.10.4
>
> Thanks in advance for your time.
>
>   Aida.
>
>
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Splitter-Problem-with-stopOnException-when-using-threadPoolExecutor-tp5737562.html
> Sent from the Camel - Users mailing list archive at Nabble.com.



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen