You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Taariq Levack (JIRA)" <ji...@apache.org> on 2013/01/04 17:06:14 UTC

[jira] [Updated] (CAMEL-5925) NullPointerException in RouteContextProcessor

     [ https://issues.apache.org/jira/browse/CAMEL-5925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Taariq Levack updated CAMEL-5925:
---------------------------------

    Attachment: CAMEL-5925_NullPointerException_in_RouteContextProcessor.patch

Possible patch, see Nabble http://camel.465427.n5.nabble.com/Fork-join-with-resequencing-td5724727.html

The unit test provided is based on Vincent's main class and demonstrates the issue once every couple of runs, often the test will pass.
With the attached patch the error no longer occurs and the test always passes.
All camel-core tests also pass.

Any other implications? And should the unit of work from line 41 be reused at 49?
                
> NullPointerException in RouteContextProcessor
> ---------------------------------------------
>
>                 Key: CAMEL-5925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-5925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.10.3
>         Environment: Ubuntu Linux 12.04 (amd64), openjdk 6 (6b24-1.11.5-0ubuntu1~12.04.1)
>            Reporter: Vincent Lombart
>         Attachments: CAMEL-5925_NullPointerException_in_RouteContextProcessor.patch
>
>
> I am trying to implement a fork/join process where work is read by a single thread, distributed to several parallel threads then joined back to another single thread. I am using Camel SEDA components for interfacing between threads. The single thread at the end must process the messages in the correct order which is why I add a stream resequencer in the route.
> I have built a small prototype that works correctly with a small number of messages (1000 messages, 10 threads in parallel). However if I increase the number of messages (10000 messages, 10 threads) I start getting null pointer exceptions:
> 01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN  o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
>         at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
> I have tried adjusting the resequencer time-out and capacity without any result. My CPU is only lightly loaded as there is plenty of sleep in the dummy processor. If anybody could tell me what I am doing wrong I would be very grateful. Here is the complete code of my prototype class:
> package camelTest;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> /**
>  *
>  * Simulation of a fork / join parallel processing.
>  *
>  * Work is created by the main method, distributed (seda:fork) to several
>  * parallel workers, joined back to a single queue (seda:join) and resequenced.
>  *
>  */
> public class MainApp {
>         private static final Logger LOG = LoggerFactory.getLogger(MainApp.class);
>         private static final int NUMBER_OF_MESSAGES = 10000;
>         public static void main(String... args) throws Exception {
>                 CamelContext context = new DefaultCamelContext();
>                 context.addRoutes(new MyRouteBuilder());
>                 context.start();
>                 LOG.info("Started");
>                 ProducerTemplate template = context.createProducerTemplate();
>                 for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
>                         template.sendBodyAndHeader("seda:fork", "Test Message: " + i,
>                                         "seqnum", new Long(i));
>                 }
>                 long expectedTime = NUMBER_OF_MESSAGES
>                                 * (RandomSleepProcessor.MAX_PROCESS_TIME + RandomSleepProcessor.MIN_PROCESS_TIME)
>                                 / 2 / MyRouteBuilder.CONCURRENCY + MyRouteBuilder.TIMEOUT;
>                 LOG.info("Expected time: {}", expectedTime);
>                 Thread.sleep(expectedTime);
>                 LOG.info("Stopping");
>                 context.stop();
>                 LOG.info("Stopped");
>         }
>         public static class MyRouteBuilder extends RouteBuilder {
>                 // Number of concurrent processing threads
>                 public static final int CONCURRENCY = 10;
>                 // Additional resequencer time-out above theoretical time-out
>                 public static final long SAFETY_TIMEOUT = 100;
>                 // Additional resequencer capacity above theoretical capacity
>                 public static final int SAFETY_CAPACITY = 10;
>                 // Resequencer time-out
>                 public static final long TIMEOUT = SAFETY_TIMEOUT
>                                 + (RandomSleepProcessor.MAX_PROCESS_TIME - RandomSleepProcessor.MIN_PROCESS_TIME);
>                 // Resequencer capacity
>                 public static final int CAPACITY = SAFETY_CAPACITY
>                                 + (int) (CONCURRENCY * TIMEOUT / RandomSleepProcessor.MIN_PROCESS_TIME);
>                 public void configure() {
>                         LOG.info("Number of processor threads: {}", CONCURRENCY);
>                         LOG.info("Resequencer time-out: {}", TIMEOUT);
>                         LOG.info("Resequencer capacity: {}", CAPACITY);
>                         Processor myProcessor = new RandomSleepProcessor();
>                         from("seda:fork?concurrentConsumers=" + CONCURRENCY).process(
>                                         myProcessor).to("seda:join");
>                         from("seda:join").resequence(header("seqnum")).stream()
>                                         .capacity(CAPACITY).timeout(TIMEOUT).to("mock:result");
>                 }
>         }
>         /**
>          * Simulation processor that sleeps a random time between MIN_PROCESS_TIME
>          * and MAX_PROCESS_TIME milliseconds.
>          *
>          */
>         public static class RandomSleepProcessor implements Processor {
>                 public static final long MIN_PROCESS_TIME = 5;
>                 public static final long MAX_PROCESS_TIME = 50;
>                 @Override
>                 public void process(Exchange arg0) throws Exception {
>                         long processTime = (long) (MIN_PROCESS_TIME + Math.random()
>                                         * (MAX_PROCESS_TIME - MIN_PROCESS_TIME));
>                         LOG.debug("Process time: {}", processTime);
>                         Thread.sleep(processTime);
>                 }
>         }
> }
> And here is the full log of a run:
> 01:35:45.632 [main] INFO  camelTest.MainApp - Number of processor threads: 10
> 01:35:45.635 [main] INFO  camelTest.MainApp - Resequencer time-out: 145
> 01:35:45.635 [main] INFO  camelTest.MainApp - Resequencer capacity: 300
> 01:35:45.674 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is starting
> 01:35:45.733 [main] INFO  o.a.c.m.ManagementStrategyFactory - JMX enabled.
> 01:35:45.938 [main] INFO  o.a.c.i.c.DefaultTypeConverter - Loaded 172 type converters
> 01:35:46.333 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[seda://fork?concurrentConsumers=10]
> 01:35:46.348 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route2 started and consuming from: Endpoint[seda://join]
> 01:35:46.348 [main] INFO  o.a.c.m.DefaultManagementLifecycleStrategy - StatisticsLevel at All so enabling load performance statistics
> 01:35:46.360 [main] INFO  o.a.camel.impl.DefaultCamelContext - Total 2 routes, of which 2 is started.
> 01:35:46.360 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) started in 0.688 seconds
> 01:35:46.360 [main] INFO  camelTest.MainApp - Started
> 01:35:48.061 [main] INFO  camelTest.MainApp - Expected time: 27645
> 01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN  o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
>         at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
> 01:36:02.994 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN  o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
>         at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
> 01:36:10.913 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN  o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
>         at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
> 01:36:15.706 [main] INFO  camelTest.MainApp - Stopping
> 01:36:15.706 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is shutting down
> 01:36:15.707 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Starting to graceful shutdown 2 routes (timeout 300 seconds)
> 01:36:16.778 [Camel (camel-1) thread #14 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route2 shutdown complete, was consuming from: Endpoint[seda://join]
> 01:36:16.779 [Camel (camel-1) thread #14 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route1 shutdown complete, was consuming from: Endpoint[seda://fork?concurrentConsumers=10]
> 01:36:16.779 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Graceful shutdown of 2 routes completed in 1 seconds
> 01:36:16.784 [main] INFO  o.a.c.i.c.DefaultTypeConverter - TypeConverterRegistry utilization[attempts=264380, hits=264380, misses=0, failures=0] mappings[total=172, misses=0]
> 01:36:16.786 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is shutdown in 1.079 seconds. Uptime 31.114 seconds.
> 01:36:16.786 [main] INFO  camelTest.MainApp - Stopped
> Vincent 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira