You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Taariq Levack (JIRA)" <ji...@apache.org> on 2013/01/04 06:54:12 UTC

[jira] [Assigned] (CAMEL-5925) NullPointerException in RouteContextProcessor

     [ https://issues.apache.org/jira/browse/CAMEL-5925?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Taariq Levack reassigned CAMEL-5925:
------------------------------------

    Assignee: Taariq Levack
    
> NullPointerException in RouteContextProcessor
> ---------------------------------------------
>
>                 Key: CAMEL-5925
>                 URL: https://issues.apache.org/jira/browse/CAMEL-5925
>             Project: Camel
>          Issue Type: Bug
>          Components: camel-core
>    Affects Versions: 2.10.3
>         Environment: Ubuntu Linux 12.04 (amd64), openjdk 6 (6b24-1.11.5-0ubuntu1~12.04.1)
>            Reporter: Vincent Lombart
>            Assignee: Taariq Levack
>
> I am trying to implement a fork/join process where work is read by a single thread, distributed to several parallel threads then joined back to another single thread. I am using Camel SEDA components for interfacing between threads. The single thread at the end must process the messages in the correct order which is why I add a stream resequencer in the route.
> I have built a small prototype that works correctly with a small number of messages (1000 messages, 10 threads in parallel). However if I increase the number of messages (10000 messages, 10 threads) I start getting null pointer exceptions:
> 01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN  o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
>         at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
> I have tried adjusting the resequencer time-out and capacity without any result. My CPU is only lightly loaded as there is plenty of sleep in the dummy processor. If anybody could tell me what I am doing wrong I would be very grateful. Here is the complete code of my prototype class:
> package camelTest;
> import org.apache.camel.CamelContext;
> import org.apache.camel.Exchange;
> import org.apache.camel.Processor;
> import org.apache.camel.ProducerTemplate;
> import org.apache.camel.builder.RouteBuilder;
> import org.apache.camel.impl.DefaultCamelContext;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> /**
>  *
>  * Simulation of a fork / join parallel processing.
>  *
>  * Work is created by the main method, distributed (seda:fork) to several
>  * parallel workers, joined back to a single queue (seda:join) and resequenced.
>  *
>  */
> public class MainApp {
>         private static final Logger LOG = LoggerFactory.getLogger(MainApp.class);
>         private static final int NUMBER_OF_MESSAGES = 10000;
>         public static void main(String... args) throws Exception {
>                 CamelContext context = new DefaultCamelContext();
>                 context.addRoutes(new MyRouteBuilder());
>                 context.start();
>                 LOG.info("Started");
>                 ProducerTemplate template = context.createProducerTemplate();
>                 for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
>                         template.sendBodyAndHeader("seda:fork", "Test Message: " + i,
>                                         "seqnum", new Long(i));
>                 }
>                 long expectedTime = NUMBER_OF_MESSAGES
>                                 * (RandomSleepProcessor.MAX_PROCESS_TIME + RandomSleepProcessor.MIN_PROCESS_TIME)
>                                 / 2 / MyRouteBuilder.CONCURRENCY + MyRouteBuilder.TIMEOUT;
>                 LOG.info("Expected time: {}", expectedTime);
>                 Thread.sleep(expectedTime);
>                 LOG.info("Stopping");
>                 context.stop();
>                 LOG.info("Stopped");
>         }
>         public static class MyRouteBuilder extends RouteBuilder {
>                 // Number of concurrent processing threads
>                 public static final int CONCURRENCY = 10;
>                 // Additional resequencer time-out above theoretical time-out
>                 public static final long SAFETY_TIMEOUT = 100;
>                 // Additional resequencer capacity above theoretical capacity
>                 public static final int SAFETY_CAPACITY = 10;
>                 // Resequencer time-out
>                 public static final long TIMEOUT = SAFETY_TIMEOUT
>                                 + (RandomSleepProcessor.MAX_PROCESS_TIME - RandomSleepProcessor.MIN_PROCESS_TIME);
>                 // Resequencer capacity
>                 public static final int CAPACITY = SAFETY_CAPACITY
>                                 + (int) (CONCURRENCY * TIMEOUT / RandomSleepProcessor.MIN_PROCESS_TIME);
>                 public void configure() {
>                         LOG.info("Number of processor threads: {}", CONCURRENCY);
>                         LOG.info("Resequencer time-out: {}", TIMEOUT);
>                         LOG.info("Resequencer capacity: {}", CAPACITY);
>                         Processor myProcessor = new RandomSleepProcessor();
>                         from("seda:fork?concurrentConsumers=" + CONCURRENCY).process(
>                                         myProcessor).to("seda:join");
>                         from("seda:join").resequence(header("seqnum")).stream()
>                                         .capacity(CAPACITY).timeout(TIMEOUT).to("mock:result");
>                 }
>         }
>         /**
>          * Simulation processor that sleeps a random time between MIN_PROCESS_TIME
>          * and MAX_PROCESS_TIME milliseconds.
>          *
>          */
>         public static class RandomSleepProcessor implements Processor {
>                 public static final long MIN_PROCESS_TIME = 5;
>                 public static final long MAX_PROCESS_TIME = 50;
>                 @Override
>                 public void process(Exchange arg0) throws Exception {
>                         long processTime = (long) (MIN_PROCESS_TIME + Math.random()
>                                         * (MAX_PROCESS_TIME - MIN_PROCESS_TIME));
>                         LOG.debug("Process time: {}", processTime);
>                         Thread.sleep(processTime);
>                 }
>         }
> }
> And here is the full log of a run:
> 01:35:45.632 [main] INFO  camelTest.MainApp - Number of processor threads: 10
> 01:35:45.635 [main] INFO  camelTest.MainApp - Resequencer time-out: 145
> 01:35:45.635 [main] INFO  camelTest.MainApp - Resequencer capacity: 300
> 01:35:45.674 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is starting
> 01:35:45.733 [main] INFO  o.a.c.m.ManagementStrategyFactory - JMX enabled.
> 01:35:45.938 [main] INFO  o.a.c.i.c.DefaultTypeConverter - Loaded 172 type converters
> 01:35:46.333 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[seda://fork?concurrentConsumers=10]
> 01:35:46.348 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route2 started and consuming from: Endpoint[seda://join]
> 01:35:46.348 [main] INFO  o.a.c.m.DefaultManagementLifecycleStrategy - StatisticsLevel at All so enabling load performance statistics
> 01:35:46.360 [main] INFO  o.a.camel.impl.DefaultCamelContext - Total 2 routes, of which 2 is started.
> 01:35:46.360 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) started in 0.688 seconds
> 01:35:46.360 [main] INFO  camelTest.MainApp - Started
> 01:35:48.061 [main] INFO  camelTest.MainApp - Expected time: 27645
> 01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN  o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
>         at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
> 01:36:02.994 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN  o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
>         at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
> 01:36:10.913 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN  o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
> java.lang.NullPointerException: null
>         at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
>         at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
> 01:36:15.706 [main] INFO  camelTest.MainApp - Stopping
> 01:36:15.706 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is shutting down
> 01:36:15.707 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Starting to graceful shutdown 2 routes (timeout 300 seconds)
> 01:36:16.778 [Camel (camel-1) thread #14 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route2 shutdown complete, was consuming from: Endpoint[seda://join]
> 01:36:16.779 [Camel (camel-1) thread #14 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route1 shutdown complete, was consuming from: Endpoint[seda://fork?concurrentConsumers=10]
> 01:36:16.779 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Graceful shutdown of 2 routes completed in 1 seconds
> 01:36:16.784 [main] INFO  o.a.c.i.c.DefaultTypeConverter - TypeConverterRegistry utilization[attempts=264380, hits=264380, misses=0, failures=0] mappings[total=172, misses=0]
> 01:36:16.786 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is shutdown in 1.079 seconds. Uptime 31.114 seconds.
> 01:36:16.786 [main] INFO  camelTest.MainApp - Stopped
> Vincent 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira