You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Vincent Lombart (JIRA)" <ji...@apache.org> on 2013/01/03 19:22:12 UTC
[jira] [Created] (CAMEL-5925) NullPointerException in
RouteContextProcessor
Vincent Lombart created CAMEL-5925:
--------------------------------------
Summary: NullPointerException in RouteContextProcessor
Key: CAMEL-5925
URL: https://issues.apache.org/jira/browse/CAMEL-5925
Project: Camel
Issue Type: Bug
Components: camel-core
Affects Versions: 2.10.3
Environment: Ubuntu Linux 12.04 (amd64), openjdk 6 (6b24-1.11.5-0ubuntu1~12.04.1)
Reporter: Vincent Lombart
I am trying to implement a fork/join process where work is read by a single thread, distributed to several parallel threads then joined back to another single thread. I am using Camel SEDA components for interfacing between threads. The single thread at the end must process the messages in the correct order which is why I add a stream resequencer in the route.
I have built a small prototype that works correctly with a small number of messages (1000 messages, 10 threads in parallel). However if I increase the number of messages (10000 messages, 10 threads) I start getting null pointer exceptions:
01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
java.lang.NullPointerException: null
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
I have tried adjusting the resequencer time-out and capacity without any result. My CPU is only lightly loaded as there is plenty of sleep in the dummy processor. If anybody could tell me what I am doing wrong I would be very grateful. Here is the complete code of my prototype class:
package camelTest;
import org.apache.camel.CamelContext;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
*
* Simulation of a fork / join parallel processing.
*
* Work is created by the main method, distributed (seda:fork) to several
* parallel workers, joined back to a single queue (seda:join) and resequenced.
*
*/
public class MainApp {
private static final Logger LOG = LoggerFactory.getLogger(MainApp.class);
private static final int NUMBER_OF_MESSAGES = 10000;
public static void main(String... args) throws Exception {
CamelContext context = new DefaultCamelContext();
context.addRoutes(new MyRouteBuilder());
context.start();
LOG.info("Started");
ProducerTemplate template = context.createProducerTemplate();
for (int i = 0; i < NUMBER_OF_MESSAGES; i++) {
template.sendBodyAndHeader("seda:fork", "Test Message: " + i,
"seqnum", new Long(i));
}
long expectedTime = NUMBER_OF_MESSAGES
* (RandomSleepProcessor.MAX_PROCESS_TIME + RandomSleepProcessor.MIN_PROCESS_TIME)
/ 2 / MyRouteBuilder.CONCURRENCY + MyRouteBuilder.TIMEOUT;
LOG.info("Expected time: {}", expectedTime);
Thread.sleep(expectedTime);
LOG.info("Stopping");
context.stop();
LOG.info("Stopped");
}
public static class MyRouteBuilder extends RouteBuilder {
// Number of concurrent processing threads
public static final int CONCURRENCY = 10;
// Additional resequencer time-out above theoretical time-out
public static final long SAFETY_TIMEOUT = 100;
// Additional resequencer capacity above theoretical capacity
public static final int SAFETY_CAPACITY = 10;
// Resequencer time-out
public static final long TIMEOUT = SAFETY_TIMEOUT
+ (RandomSleepProcessor.MAX_PROCESS_TIME - RandomSleepProcessor.MIN_PROCESS_TIME);
// Resequencer capacity
public static final int CAPACITY = SAFETY_CAPACITY
+ (int) (CONCURRENCY * TIMEOUT / RandomSleepProcessor.MIN_PROCESS_TIME);
public void configure() {
LOG.info("Number of processor threads: {}", CONCURRENCY);
LOG.info("Resequencer time-out: {}", TIMEOUT);
LOG.info("Resequencer capacity: {}", CAPACITY);
Processor myProcessor = new RandomSleepProcessor();
from("seda:fork?concurrentConsumers=" + CONCURRENCY).process(
myProcessor).to("seda:join");
from("seda:join").resequence(header("seqnum")).stream()
.capacity(CAPACITY).timeout(TIMEOUT).to("mock:result");
}
}
/**
* Simulation processor that sleeps a random time between MIN_PROCESS_TIME
* and MAX_PROCESS_TIME milliseconds.
*
*/
public static class RandomSleepProcessor implements Processor {
public static final long MIN_PROCESS_TIME = 5;
public static final long MAX_PROCESS_TIME = 50;
@Override
public void process(Exchange arg0) throws Exception {
long processTime = (long) (MIN_PROCESS_TIME + Math.random()
* (MAX_PROCESS_TIME - MIN_PROCESS_TIME));
LOG.debug("Process time: {}", processTime);
Thread.sleep(processTime);
}
}
}
And here is the full log of a run:
01:35:45.632 [main] INFO camelTest.MainApp - Number of processor threads: 10
01:35:45.635 [main] INFO camelTest.MainApp - Resequencer time-out: 145
01:35:45.635 [main] INFO camelTest.MainApp - Resequencer capacity: 300
01:35:45.674 [main] INFO o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is starting
01:35:45.733 [main] INFO o.a.c.m.ManagementStrategyFactory - JMX enabled.
01:35:45.938 [main] INFO o.a.c.i.c.DefaultTypeConverter - Loaded 172 type converters
01:35:46.333 [main] INFO o.a.camel.impl.DefaultCamelContext - Route: route1 started and consuming from: Endpoint[seda://fork?concurrentConsumers=10]
01:35:46.348 [main] INFO o.a.camel.impl.DefaultCamelContext - Route: route2 started and consuming from: Endpoint[seda://join]
01:35:46.348 [main] INFO o.a.c.m.DefaultManagementLifecycleStrategy - StatisticsLevel at All so enabling load performance statistics
01:35:46.360 [main] INFO o.a.camel.impl.DefaultCamelContext - Total 2 routes, of which 2 is started.
01:35:46.360 [main] INFO o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) started in 0.688 seconds
01:35:46.360 [main] INFO camelTest.MainApp - Started
01:35:48.061 [main] INFO camelTest.MainApp - Expected time: 27645
01:35:49.719 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
java.lang.NullPointerException: null
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
01:36:02.994 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
java.lang.NullPointerException: null
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
01:36:10.913 [Camel (camel-1) thread #0 - Resequencer Delivery] WARN o.a.c.processor.StreamResequencer - Caused by: [java.lang.NullPointerException - null]
java.lang.NullPointerException: null
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:42) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:303) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:290) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:185) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.StreamResequencer.sendElement(StreamResequencer.java:65) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.resequencer.ResequencerEngine.deliverNext(ResequencerEngine.java:261) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.resequencer.ResequencerEngine.deliver(ResequencerEngine.java:225) ~[camel-core-2.10.3.jar:2.10.3]
at org.apache.camel.processor.StreamResequencer$Delivery.run(StreamResequencer.java:242) ~[camel-core-2.10.3.jar:2.10.3]
01:36:15.706 [main] INFO camelTest.MainApp - Stopping
01:36:15.706 [main] INFO o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is shutting down
01:36:15.707 [main] INFO o.a.c.impl.DefaultShutdownStrategy - Starting to graceful shutdown 2 routes (timeout 300 seconds)
01:36:16.778 [Camel (camel-1) thread #14 - ShutdownTask] INFO o.a.c.impl.DefaultShutdownStrategy - Route: route2 shutdown complete, was consuming from: Endpoint[seda://join]
01:36:16.779 [Camel (camel-1) thread #14 - ShutdownTask] INFO o.a.c.impl.DefaultShutdownStrategy - Route: route1 shutdown complete, was consuming from: Endpoint[seda://fork?concurrentConsumers=10]
01:36:16.779 [main] INFO o.a.c.impl.DefaultShutdownStrategy - Graceful shutdown of 2 routes completed in 1 seconds
01:36:16.784 [main] INFO o.a.c.i.c.DefaultTypeConverter - TypeConverterRegistry utilization[attempts=264380, hits=264380, misses=0, failures=0] mappings[total=172, misses=0]
01:36:16.786 [main] INFO o.a.camel.impl.DefaultCamelContext - Apache Camel 2.10.3 (CamelContext: camel-1) is shutdown in 1.079 seconds. Uptime 31.114 seconds.
01:36:16.786 [main] INFO camelTest.MainApp - Stopped
Vincent
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira