You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Taariq Levack (JIRA)" <ji...@apache.org> on 2013/01/04 06:54:12 UTC
[jira] [Assigned] (CAMEL-5925) NullPointerException in
RouteContextProcessor
[ https://issues.apache.org/jira/browse/CAMEL-5925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Taariq Levack reassigned CAMEL-5925:
------------------------------------
Assignee: Taariq Levack
> NullPointerException in RouteContextProcessor
> ---------------------------------------------
>
> Key: CAMEL-5925
> URL: https://issues.apache.org/jira/browse/CAMEL-5925
> Project: Camel
> Issue Type: Bug
> Components: camel-core
> Affects Versions: 2.10.3
> Environment: Ubuntu Linux 12.04 (amd64), openjdk 6 (6b24-1.11.5-0ubuntu1~12.04.1)
> Reporter: Vincent Lombart
> Assignee: Taariq Levack
>
> I am trying to implement a fork/join process where work is read by a single thread, distributed to several parallel threads then joined back to another single thread. I am using Camel SEDA components for interfacing between threads. The single thread at the end must process the messages in the correct order which is why I add a stream resequencer in the route.
> I have built a small prototype that works correctly with a small number of messages (1000 messages, 10 threads in parallel). However if I increase the number of messages (10000 messages, 10 threads) I start getting null pointer exceptions:
> 01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
> at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
> I have tried adjusting the resequencer time-out and capacity without any result. My CPU is only lightly loaded as there is plenty of sleep in the dummy processor. If anybody could tell me what I am doing wrong I would be very grateful. Here is the complete code of my prototype class:
> package camelTest;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> /**
> *
> * Simulation of a fork / join parallel processing.
> *
> * Work is created by the main method, distributed (seda:fork) to several
> * parallel workers, joined back to a single queue (seda:join) and resequenced.
> *
> */
> public class MainApp {
> private static final Logger LOG = LoggerFactory.getLogger(MainApp.class);
> private static final int NUMBER_OF_MESSAGES = 10000;
> public static void main(String... args) throws Exception {
> CamelContext context = new DefaultCamelContext();
> context.addRoutes(new MyRouteBuilder());
> context.start();
> LOG.info("Started");
> ProducerTemplate template = context.createProducerTemplate();
> for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
> template.sendBodyAndHeader("seda:fork", "Test Message: " + i,
> "seqnum", new Long(i));
> }
> long expectedTime = NUMBER_OF_MESSAGES
> * (RandomSleepProcessor.MAX_PROCESS_TIME + RandomSleepProcessor.MIN_PROCESS_TIME)
> / 2 / MyRouteBuilder.CONCURRENCY + MyRouteBuilder.TIMEOUT;
> LOG.info("Expected time: {}", expectedTime);
> Thread.sleep(expectedTime);
> LOG.info("Stopping");
> context.stop();
> LOG.info("Stopped");
> }
> public static class MyRouteBuilder extends RouteBuilder {
> // Number of concurrent processing threads
> public static final int CONCURRENCY = 10;
> // Additional resequencer time-out above theoretical time-out
> public static final long SAFETY_TIMEOUT = 100;
> // Additional resequencer capacity above theoretical capacity
> public static final int SAFETY_CAPACITY = 10;
> // Resequencer time-out
> public static final long TIMEOUT = SAFETY_TIMEOUT
> + (RandomSleepProcessor.MAX_PROCESS_TIME - RandomSleepProcessor.MIN_PROCESS_TIME);
> // Resequencer capacity
> public static final int CAPACITY = SAFETY_CAPACITY
> + (int) (CONCURRENCY * TIMEOUT / RandomSleepProcessor.MIN_PROCESS_TIME);
> public void configure() {
> LOG.info("Number of processor threads: {}", CONCURRENCY);
> LOG.info("Resequencer time-out: {}", TIMEOUT);
> LOG.info("Resequencer capacity: {}", CAPACITY);
> Processor myProcessor = new RandomSleepProcessor();
> from("seda:fork?concurrentConsumers=" + CONCURRENCY).process(
> myProcessor).to("seda:join");
> from("seda:join").resequence(header("seqnum")).stream()
> .capacity(CAPACITY).timeout(TIMEOUT).to("mock:result");
> }
> }
> /**
> * Simulation processor that sleeps a random time between MIN_PROCESS_TIME
> * and MAX_PROCESS_TIME milliseconds.
> *
> */
> public static class RandomSleepProcessor implements Processor {
> public static final long MIN_PROCESS_TIME = 5;
> public static final long MAX_PROCESS_TIME = 50;
> @Override
> public void process(Exchange arg0) throws Exception {
> long processTime = (long) (MIN_PROCESS_TIME + Math.random()
> * (MAX_PROCESS_TIME - MIN_PROCESS_TIME));
> LOG.debug("Process time: {}", processTime);
> Thread.sleep(processTime);
> }
> }
> }
> And here is the full log of a run:
> 01:35:45.632 [main] INFO camelTest.MainApp - Number of processor threads: 10
> 01:35:45.635 [main] INFO camelTest.MainApp - Resequencer time-out: 145
> 01:35:45.635 [main] INFO camelTest.MainApp - Resequencer capacity: 300
> 01:35:45.674 [main] INFO o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is starting
> 01:35:45.733 [main] INFO o.a.c.m.ManagementStrategyFactory - JMX enabled.
> 01:35:45.938 [main] INFO o.a.c.i.c.DefaultTypeConverter - Loaded 172 type converters
> 01:35:46.333 [main] INFO o.a.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[seda://fork?concurrentConsumers=10]
> 01:35:46.348 [main] INFO o.a.camel.impl.DefaultCamelContext - Route: route2 started and consuming from: Endpoint[seda://join]
> 01:35:46.348 [main] INFO o.a.c.m.DefaultManagementLifecycleStrategy - StatisticsLevel at All so enabling load performance statistics
> 01:35:46.360 [main] INFO o.a.camel.impl.DefaultCamelContext - Total 2 routes, of which 2 is started.
> 01:35:46.360 [main] INFO o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) started in 0.688 seconds
> 01:35:46.360 [main] INFO camelTest.MainApp - Started
> 01:35:48.061 [main] INFO camelTest.MainApp - Expected time: 27645
> 01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
> at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
> 01:36:02.994 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
> at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
> 01:36:10.913 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
> at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
> at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
> 01:36:15.706 [main] INFO camelTest.MainApp - Stopping
> 01:36:15.706 [main] INFO o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is shutting down
> 01:36:15.707 [main] INFO o.a.c.impl.DefaultShutdownStrategy - Starting to graceful shutdown 2 routes (timeout 300 seconds)
> 01:36:16.778 [Camel (camel-1) thread #14 - ShutdownTask] INFO o.a.c.impl.DefaultShutdownStrategy - Route: route2 shutdown complete, was consuming from: Endpoint[seda://join]
> 01:36:16.779 [Camel (camel-1) thread #14 - ShutdownTask] INFO o.a.c.impl.DefaultShutdownStrategy - Route: route1 shutdown complete, was consuming from: Endpoint[seda://fork?concurrentConsumers=10]
> 01:36:16.779 [main] INFO o.a.c.impl.DefaultShutdownStrategy - Graceful shutdown of 2 routes completed in 1 seconds
> 01:36:16.784 [main] INFO o.a.c.i.c.DefaultTypeConverter - TypeConverterRegistry utilization[attempts=264380, hits=264380, misses=0, failures=0] mappings[total=172, misses=0]
> 01:36:16.786 [main] INFO o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is shutdown in 1.079 seconds. Uptime 31.114 seconds.
> 01:36:16.786 [main] INFO camelTest.MainApp - Stopped
> Vincent
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira