You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Vincent Lombart <ap...@lombart.be> on 2012/12/31 02:10:40 UTC

Fork/join with resequencing

Hello all,

I am trying to implement a fork/join process where work is read by a single
thread, distributed to several parallel threads then joined back to another
single thread. I am using Camel SEDA components for interfacing between
threads. The single thread at the end must process the messages in the
correct order which is why I add a stream resequencer in the route.

I have built a small prototype that works correctly with a small number of
messages (1000 messages, 10 threads in parallel). However if I increase the
number of messages (10000 messages, 10 threads) I start getting null pointer
exceptions:
01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN 
o.a.c.processor.StreamResequencer - Caused by:
[java.lang.NullPointerException - null]
java.lang.NullPointerException: null
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242)
~[camel-core-2.10.3.jar:2.10.3]

I have tried adjusting the resequencer time-out and capacity without any
result. My CPU is only lightly loaded as there is plenty of sleep in the
dummy processor. If anybody could tell me what I am doing wrong I would be
very grateful. Here is the complete code of my prototype class:

package camelTest;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 
 * Simulation of a fork / join parallel processing.
 * 
 * Work is created by the main method, distributed (seda:fork) to several
 * parallel workers, joined back to a single queue (seda:join) and
resequenced.
 * 
 */
public class MainApp {

	private static final Logger LOG = LoggerFactory.getLogger(MainApp.class);
	private static final int NUMBER_OF_MESSAGES = 10000;

	public static void main(String... args) throws Exception {
		CamelContext context = new DefaultCamelContext();
		context.addRoutes(new MyRouteBuilder());
		context.start();

		LOG.info("Started");

		ProducerTemplate template = context.createProducerTemplate();
		for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
			template.sendBodyAndHeader("seda:fork", "Test Message: " + i,
					"seqnum", new Long(i));
		}

		long expectedTime = NUMBER_OF_MESSAGES
				* (RandomSleepProcessor.MAX_PROCESS_TIME +
RandomSleepProcessor.MIN_PROCESS_TIME)
				/ 2 / MyRouteBuilder.CONCURRENCY + MyRouteBuilder.TIMEOUT;
		LOG.info("Expected time: {}", expectedTime);
		Thread.sleep(expectedTime);
		LOG.info("Stopping");
		context.stop();
		LOG.info("Stopped");

	}

	public static class MyRouteBuilder extends RouteBuilder {
		// Number of concurrent processing threads
		public static final int CONCURRENCY = 10;

		// Additional resequencer time-out above theoretical time-out
		public static final long SAFETY_TIMEOUT = 100;

		// Additional resequencer capacity above theoretical capacity
		public static final int SAFETY_CAPACITY = 10;

		// Resequencer time-out
		public static final long TIMEOUT = SAFETY_TIMEOUT
				+ (RandomSleepProcessor.MAX_PROCESS_TIME -
RandomSleepProcessor.MIN_PROCESS_TIME);

		// Resequencer capacity
		public static final int CAPACITY = SAFETY_CAPACITY
				+ (int) (CONCURRENCY * TIMEOUT / RandomSleepProcessor.MIN_PROCESS_TIME);

		public void configure() {
			LOG.info("Number of processor threads: {}", CONCURRENCY);
			LOG.info("Resequencer time-out: {}", TIMEOUT);
			LOG.info("Resequencer capacity: {}", CAPACITY);
			Processor myProcessor = new RandomSleepProcessor();
			from("seda:fork?concurrentConsumers=" + CONCURRENCY).process(
					myProcessor).to("seda:join");
			from("seda:join").resequence(header("seqnum")).stream()
					.capacity(CAPACITY).timeout(TIMEOUT).to("mock:result");
		}

	}

	/**
	 * Simulation processor that sleeps a random time between MIN_PROCESS_TIME
	 * and MAX_PROCESS_TIME milliseconds.
	 * 
	 */
	public static class RandomSleepProcessor implements Processor {
		public static final long MIN_PROCESS_TIME = 5;
		public static final long MAX_PROCESS_TIME = 50;

		@Override
		public void process(Exchange arg0) throws Exception {
			long processTime = (long) (MIN_PROCESS_TIME + Math.random()
					* (MAX_PROCESS_TIME - MIN_PROCESS_TIME));
			LOG.debug("Process time: {}", processTime);
			Thread.sleep(processTime);
		}

	}

}

And here is the full log of a run:
01:35:45.632 [main] INFO  camelTest.MainApp - Number of processor threads:
10
01:35:45.635 [main] INFO  camelTest.MainApp - Resequencer time-out: 145
01:35:45.635 [main] INFO  camelTest.MainApp - Resequencer capacity: 300
01:35:45.674 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel
2.10.3 (CamelContext: camel-1) is starting
01:35:45.733 [main] INFO  o.a.c.m.ManagementStrategyFactory - JMX enabled.
01:35:45.938 [main] INFO  o.a.c.i.c.DefaultTypeConverter - Loaded 172 type
converters
01:35:46.333 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route1
started and consuming from: Endpoint[seda://fork?concurrentConsumers=10]
01:35:46.348 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route2
started and consuming from: Endpoint[seda://join]
01:35:46.348 [main] INFO  o.a.c.m.DefaultManagementLifecycleStrategy -
StatisticsLevel at All so enabling load performance statistics
01:35:46.360 [main] INFO  o.a.camel.impl.DefaultCamelContext - Total 2
routes, of which 2 is started.
01:35:46.360 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel
2.10.3 (CamelContext: camel-1) started in 0.688 seconds
01:35:46.360 [main] INFO  camelTest.MainApp - Started
01:35:48.061 [main] INFO  camelTest.MainApp - Expected time: 27645
01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN 
o.a.c.processor.StreamResequencer - Caused by:
[java.lang.NullPointerException - null]
java.lang.NullPointerException: null
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242)
~[camel-core-2.10.3.jar:2.10.3]
01:36:02.994 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN 
o.a.c.processor.StreamResequencer - Caused by:
[java.lang.NullPointerException - null]
java.lang.NullPointerException: null
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242)
~[camel-core-2.10.3.jar:2.10.3]
01:36:10.913 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN 
o.a.c.processor.StreamResequencer - Caused by:
[java.lang.NullPointerException - null]
java.lang.NullPointerException: null
	at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225)
~[camel-core-2.10.3.jar:2.10.3]
	at
org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242)
~[camel-core-2.10.3.jar:2.10.3]
01:36:15.706 [main] INFO  camelTest.MainApp - Stopping
01:36:15.706 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel
2.10.3 (CamelContext: camel-1) is shutting down
01:36:15.707 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Starting to
graceful shutdown 2 routes (timeout 300 seconds)
01:36:16.778 [Camel (camel-1) thread #14 - ShutdownTask] INFO 
o.a.c.impl.DefaultShutdownStrategy - Route: route2 shutdown complete, was
consuming from: Endpoint[seda://join]
01:36:16.779 [Camel (camel-1) thread #14 - ShutdownTask] INFO 
o.a.c.impl.DefaultShutdownStrategy - Route: route1 shutdown complete, was
consuming from: Endpoint[seda://fork?concurrentConsumers=10]
01:36:16.779 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Graceful
shutdown of 2 routes completed in 1 seconds
01:36:16.784 [main] INFO  o.a.c.i.c.DefaultTypeConverter -
TypeConverterRegistry utilization[attempts=264380, hits=264380, misses=0,
failures=0] mappings[total=172, misses=0]
01:36:16.786 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel
2.10.3 (CamelContext: camel-1) is shutdown in 1.079 seconds. Uptime 31.114
seconds.
01:36:16.786 [main] INFO  camelTest.MainApp - Stopped

Vincent




--
View this message in context: http://camel.465427.n5.nabble.com/Fork-join-with-resequencing-tp5724727.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Fork/join with resequencing

Posted by Vincent Lombart <ap...@lombart.be>.
Hello Taariq,

Thank you for taking the time to look into my problem. As suggested I have
opened the Jira  CAMEL-5925
<https://issues.apache.org/jira/browse/CAMEL-5925>   to let you attach a
patch.

Best regards
Vincent


Taariq Levack wrote
> For completion, that snippet wasn't complete but it you apply the change
> you'll see to also use that unitOfWork reference throughout the method,
> otherwise the null pointer will likely only be delayed.
> I'll run the camel-core tests tomorrow but your code worked consistently.
> 
> Taariq
> 
> On 03 Jan 2013, at 18:04, Taariq Levack &lt;

> taariql@

> &gt; wrote:
> 
>> Hi Vincent
>> 
>> I reproduced your error with your code in Camel 2.9 and 2.10, and maybe
>> you've found a bug. 
>> At first I thought it's your sleep calculations but even so we shouldn't
>> throw a null pointer.
>> 
>> Please file a JIRA so I can attach a patch and you can see if it works
>> for you too, but with the change I don't get this error. Or apply it and
>> try a build. 
>> I'm not sure if I'll have time soon to dig further into why this is, but
>> if so I'll take a few stabs at it, and maybe somebody else already knows
>> why and has a better solution.
>> 
>> The change is in RouteContextProcessor, line 42, replace it with this so
>> the unit of work isn't yanked out from underneath it, for some reason
>> it's set to null and I only see valid reasons so far, though with
>> concurrency it might not be valid at a certain time or context.
>> 
>> final UnitOfWork unitOfWork = exchange.getUnitOfWork();
>>         if (unitOfWork != null) {
>>             unitOfWork.pushRouteContext(routeContext);
>>         }
>> 
>> Taariq
>> 
>> 
>> 
>> On Thu, Jan 3, 2013 at 12:09 PM, Vincent Lombart &lt;

> apache@

> &gt; wrote:
>> Hello,
>> 
>> Does anybody have an idea about what could sometimes cause a
>> NullPointerException in Camel code in the second route (all my messages
>> are
>> built with the "seqnum" header)?
>> 
>> Vincent Lombart wrote
>> >                       from("seda:fork?concurrentConsumers=" +
>> CONCURRENCY).process(
>> >                                       myProcessor).to("seda:join");
>> >                      
>> from("seda:join").resequence(header("seqnum")).stream()
>> >                                      
>> .capacity(CAPACITY).timeout(TIMEOUT).to("mock:result");
>> 
>> Here is the exception:
>> 01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN
>> o.a.c.processor.StreamResequencer - Caused by:
>> [java.lang.NullPointerException - null]
>> java.lang.NullPointerException: null
>>         at
>> org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42)
>> ~[camel-core-2.10.3.jar:2.10.3]
>>         at
>> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
>> ~[camel-core-2.10.3.jar:2.10.3]
>>         at
>> org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303)
>> ~[camel-core-2.10.3.jar:2.10.3]
>>         at
>> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
>> ~[camel-core-2.10.3.jar:2.10.3]
>>         at
>> org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290)
>> ~[camel-core-2.10.3.jar:2.10.3]
>>         at
>> org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185)
>> ~[camel-core-2.10.3.jar:2.10.3]
>>         at
>> org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65)
>> ~[camel-core-2.10.3.jar:2.10.3]
>>         at
>> org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261)
>> ~[camel-core-2.10.3.jar:2.10.3]
>>         at
>> org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225)
>> ~[camel-core-2.10.3.jar:2.10.3]
>>         at
>> org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242)
>> ~[camel-core-2.10.3.jar:2.10.3]
>> 
>> Best regards
>> Vincent
>> 
>> 
>> 
>> --
>> View this message in context:
>> http://camel.465427.n5.nabble.com/Fork-join-with-resequencing-tp5724727p5724823.html
>> Sent from the Camel - Users mailing list archive at Nabble.com.
>>





--
View this message in context: http://camel.465427.n5.nabble.com/Fork-join-with-resequencing-tp5724727p5724856.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Fork/join with resequencing

Posted by Taariq Levack <ta...@gmail.com>.
For completion, that snippet wasn't complete but it you apply the change you'll see to also use that unitOfWork reference throughout the method, otherwise the null pointer will likely only be delayed.
I'll run the camel-core tests tomorrow but your code worked consistently.

Taariq

On 03 Jan 2013, at 18:04, Taariq Levack <ta...@gmail.com> wrote:

> Hi Vincent
> 
> I reproduced your error with your code in Camel 2.9 and 2.10, and maybe you've found a bug. 
> At first I thought it's your sleep calculations but even so we shouldn't throw a null pointer.
> 
> Please file a JIRA so I can attach a patch and you can see if it works for you too, but with the change I don't get this error. Or apply it and try a build. 
> I'm not sure if I'll have time soon to dig further into why this is, but if so I'll take a few stabs at it, and maybe somebody else already knows why and has a better solution.
> 
> The change is in RouteContextProcessor, line 42, replace it with this so the unit of work isn't yanked out from underneath it, for some reason it's set to null and I only see valid reasons so far, though with concurrency it might not be valid at a certain time or context.
> 
> final UnitOfWork unitOfWork = exchange.getUnitOfWork();
>         if (unitOfWork != null) {
>             unitOfWork.pushRouteContext(routeContext);
>         }
> 
> Taariq
> 
> 
> 
> On Thu, Jan 3, 2013 at 12:09 PM, Vincent Lombart <ap...@lombart.be> wrote:
> Hello,
> 
> Does anybody have an idea about what could sometimes cause a
> NullPointerException in Camel code in the second route (all my messages are
> built with the "seqnum" header)?
> 
> Vincent Lombart wrote
> >                       from("seda:fork?concurrentConsumers=" + CONCURRENCY).process(
> >                                       myProcessor).to("seda:join");
> >                       from("seda:join").resequence(header("seqnum")).stream()
> >                                       .capacity(CAPACITY).timeout(TIMEOUT).to("mock:result");
> 
> Here is the exception:
> 01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN
> o.a.c.processor.StreamResequencer - Caused by:
> [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
>         at
> org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
> org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
> org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
> org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
> org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
> org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
> org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
> org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242)
> ~[camel-core-2.10.3.jar:2.10.3]
> 
> Best regards
> Vincent
> 
> 
> 
> --
> View this message in context: http://camel.465427.n5.nabble.com/Fork-join-with-resequencing-tp5724727p5724823.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
> 

Re: Fork/join with resequencing

Posted by Taariq Levack <ta...@gmail.com>.
Hi Vincent

I reproduced your error with your code in Camel 2.9 and 2.10, and maybe
you've found a bug.
At first I thought it's your sleep calculations but even so we shouldn't
throw a null pointer.

Please file a JIRA so I can attach a patch and you can see if it works for
you too, but with the change I don't get this error. Or apply it and try a
build.
I'm not sure if I'll have time soon to dig further into why this is, but if
so I'll take a few stabs at it, and maybe somebody else already knows why
and has a better solution.

The change is in RouteContextProcessor, line 42, replace it with this so
the unit of work isn't yanked out from underneath it, for some reason it's
set to null and I only see valid reasons so far, though with concurrency it
might not be valid at a certain time or context.

final UnitOfWork unitOfWork = exchange.getUnitOfWork();
        if (unitOfWork != null) {
            unitOfWork.pushRouteContext(routeContext);
        }

Taariq



On Thu, Jan 3, 2013 at 12:09 PM, Vincent Lombart <ap...@lombart.be> wrote:

> Hello,
>
> Does anybody have an idea about what could sometimes cause a
> NullPointerException in Camel code in the second route (all my messages are
> built with the "seqnum" header)?
>
> Vincent Lombart wrote
> >                       from("seda:fork?concurrentConsumers=" +
> CONCURRENCY).process(
> >                                       myProcessor).to("seda:join");
> >
> from("seda:join").resequence(header("seqnum")).stream()
> >
> .capacity(CAPACITY).timeout(TIMEOUT).to("mock:result");
>
> Here is the exception:
> 01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN
> o.a.c.processor.StreamResequencer - Caused by:
> [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
>         at
>
> org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
>
> org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
>
> org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
>
> org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
>
> org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
>
> org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
>
> org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
>
> org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
>
> org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225)
> ~[camel-core-2.10.3.jar:2.10.3]
>         at
>
> org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242)
> ~[camel-core-2.10.3.jar:2.10.3]
>
> Best regards
> Vincent
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Fork-join-with-resequencing-tp5724727p5724823.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>

Re: Fork/join with resequencing

Posted by Vincent Lombart <ap...@lombart.be>.
Hello,

Does anybody have an idea about what could sometimes cause a
NullPointerException in Camel code in the second route (all my messages are
built with the "seqnum" header)?

Vincent Lombart wrote
> 			from("seda:fork?concurrentConsumers=" + CONCURRENCY).process(
> 					myProcessor).to("seda:join");
> 			from("seda:join").resequence(header("seqnum")).stream()
> 					.capacity(CAPACITY).timeout(TIMEOUT).to("mock:result");

Here is the exception:
01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN 
o.a.c.processor.StreamResequencer - Caused by:
[java.lang.NullPointerException - null]
java.lang.NullPointerException: null
        at
org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42)
~[camel-core-2.10.3.jar:2.10.3]
        at
org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
~[camel-core-2.10.3.jar:2.10.3]
        at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303)
~[camel-core-2.10.3.jar:2.10.3]
        at
org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
~[camel-core-2.10.3.jar:2.10.3]
        at
org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290)
~[camel-core-2.10.3.jar:2.10.3]
        at
org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185)
~[camel-core-2.10.3.jar:2.10.3]
        at
org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65)
~[camel-core-2.10.3.jar:2.10.3]
        at
org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261)
~[camel-core-2.10.3.jar:2.10.3]
        at
org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225)
~[camel-core-2.10.3.jar:2.10.3]
        at
org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242)
~[camel-core-2.10.3.jar:2.10.3] 

Best regards
Vincent



--
View this message in context: http://camel.465427.n5.nabble.com/Fork-join-with-resequencing-tp5724727p5724823.html
Sent from the Camel - Users mailing list archive at Nabble.com.