You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Vincent Lombart (JIRA)" <ji...@apache.org> on 2013/01/03 19:22:12 UTC

[jira] [Created] (CAMEL-5925) NullPointerException in RouteContextProcessor

Vincent Lombart created CAMEL-5925:
--------------------------------------

             Summary: NullPointerException in RouteContextProcessor
                 Key: CAMEL-5925
                 URL: https://issues.apache.org/jira/browse/CAMEL-5925
             Project: Camel
          Issue Type: Bug
          Components: camel-core
    Affects Versions: 2.10.3
         Environment: Ubuntu Linux 12.04 (amd64), openjdk 6 (6b24-1.11.5-0ubuntu1~12.04.1)
            Reporter: Vincent Lombart


I am trying to implement a fork/join process where work is read by a single thread, distributed to several parallel threads then joined back to another single thread. I am using Camel SEDA components for interfacing between threads. The single thread at the end must process the messages in the correct order which is why I add a stream resequencer in the route.

I have built a small prototype that works correctly with a small number of messages (1000 messages, 10 threads in parallel). However if I increase the number of messages (10000 messages, 10 threads) I start getting null pointer exceptions:
01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN  o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
java.lang.NullPointerException: null
        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]

I have tried adjusting the resequencer time-out and capacity without any result. My CPU is only lightly loaded as there is plenty of sleep in the dummy processor. If anybody could tell me what I am doing wrong I would be very grateful. Here is the complete code of my prototype class:

package camelTest;

import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 *
 * Simulation of a fork / join parallel processing.
 *
 * Work is created by the main method, distributed (seda:fork) to several
 * parallel workers, joined back to a single queue (seda:join) and resequenced.
 *
 */
public class MainApp {

        private static final Logger LOG = LoggerFactory.getLogger(MainApp.class);
        private static final int NUMBER_OF_MESSAGES = 10000;

        public static void main(String... args) throws Exception {
                CamelContext context = new DefaultCamelContext();
                context.addRoutes(new MyRouteBuilder());
                context.start();

                LOG.info("Started");

                ProducerTemplate template = context.createProducerTemplate();
                for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
                        template.sendBodyAndHeader("seda:fork", "Test Message: " + i,
                                        "seqnum", new Long(i));
                }

                long expectedTime = NUMBER_OF_MESSAGES
                                * (RandomSleepProcessor.MAX_PROCESS_TIME + RandomSleepProcessor.MIN_PROCESS_TIME)
                                / 2 / MyRouteBuilder.CONCURRENCY + MyRouteBuilder.TIMEOUT;
                LOG.info("Expected time: {}", expectedTime);
                Thread.sleep(expectedTime);
                LOG.info("Stopping");
                context.stop();
                LOG.info("Stopped");

        }

        public static class MyRouteBuilder extends RouteBuilder {
                // Number of concurrent processing threads
                public static final int CONCURRENCY = 10;

                // Additional resequencer time-out above theoretical time-out
                public static final long SAFETY_TIMEOUT = 100;

                // Additional resequencer capacity above theoretical capacity
                public static final int SAFETY_CAPACITY = 10;

                // Resequencer time-out
                public static final long TIMEOUT = SAFETY_TIMEOUT
                                + (RandomSleepProcessor.MAX_PROCESS_TIME - RandomSleepProcessor.MIN_PROCESS_TIME);

                // Resequencer capacity
                public static final int CAPACITY = SAFETY_CAPACITY
                                + (int) (CONCURRENCY * TIMEOUT / RandomSleepProcessor.MIN_PROCESS_TIME);

                public void configure() {
                        LOG.info("Number of processor threads: {}", CONCURRENCY);
                        LOG.info("Resequencer time-out: {}", TIMEOUT);
                        LOG.info("Resequencer capacity: {}", CAPACITY);
                        Processor myProcessor = new RandomSleepProcessor();
                        from("seda:fork?concurrentConsumers=" + CONCURRENCY).process(
                                        myProcessor).to("seda:join");
                        from("seda:join").resequence(header("seqnum")).stream()
                                        .capacity(CAPACITY).timeout(TIMEOUT).to("mock:result");
                }

        }

        /**
         * Simulation processor that sleeps a random time between MIN_PROCESS_TIME
         * and MAX_PROCESS_TIME milliseconds.
         *
         */
        public static class RandomSleepProcessor implements Processor {
                public static final long MIN_PROCESS_TIME = 5;
                public static final long MAX_PROCESS_TIME = 50;

                @Override
                public void process(Exchange arg0) throws Exception {
                        long processTime = (long) (MIN_PROCESS_TIME + Math.random()
                                        * (MAX_PROCESS_TIME - MIN_PROCESS_TIME));
                        LOG.debug("Process time: {}", processTime);
                        Thread.sleep(processTime);
                }

        }

}

And here is the full log of a run:
01:35:45.632 [main] INFO  camelTest.MainApp - Number of processor threads: 10
01:35:45.635 [main] INFO  camelTest.MainApp - Resequencer time-out: 145
01:35:45.635 [main] INFO  camelTest.MainApp - Resequencer capacity: 300
01:35:45.674 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is starting
01:35:45.733 [main] INFO  o.a.c.m.ManagementStrategyFactory - JMX enabled.
01:35:45.938 [main] INFO  o.a.c.i.c.DefaultTypeConverter - Loaded 172 type converters
01:35:46.333 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[seda://fork?concurrentConsumers=10]
01:35:46.348 [main] INFO  o.a.camel.impl.DefaultCamelContext - Route: route2 started and consuming from: Endpoint[seda://join]
01:35:46.348 [main] INFO  o.a.c.m.DefaultManagementLifecycleStrategy - StatisticsLevel at All so enabling load performance statistics
01:35:46.360 [main] INFO  o.a.camel.impl.DefaultCamelContext - Total 2 routes, of which 2 is started.
01:35:46.360 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) started in 0.688 seconds
01:35:46.360 [main] INFO  camelTest.MainApp - Started
01:35:48.061 [main] INFO  camelTest.MainApp - Expected time: 27645
01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN  o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
java.lang.NullPointerException: null
        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
01:36:02.994 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN  o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
java.lang.NullPointerException: null
        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
01:36:10.913 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN  o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
java.lang.NullPointerException: null
        at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
        at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
01:36:15.706 [main] INFO  camelTest.MainApp - Stopping
01:36:15.706 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is shutting down
01:36:15.707 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Starting to graceful shutdown 2 routes (timeout 300 seconds)
01:36:16.778 [Camel (camel-1) thread #14 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route2 shutdown complete, was consuming from: Endpoint[seda://join]
01:36:16.779 [Camel (camel-1) thread #14 - ShutdownTask] INFO  o.a.c.impl.DefaultShutdownStrategy - Route: route1 shutdown complete, was consuming from: Endpoint[seda://fork?concurrentConsumers=10]
01:36:16.779 [main] INFO  o.a.c.impl.DefaultShutdownStrategy - Graceful shutdown of 2 routes completed in 1 seconds
01:36:16.784 [main] INFO  o.a.c.i.c.DefaultTypeConverter - TypeConverterRegistry utilization[attempts=264380, hits=264380, misses=0, failures=0] mappings[total=172, misses=0]
01:36:16.786 [main] INFO  o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is shutdown in 1.079 seconds. Uptime 31.114 seconds.
01:36:16.786 [main] INFO  camelTest.MainApp - Stopped

Vincent 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira