You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@camel.apache.org by "Joshua Groom (JIRA)" <ji...@apache.org> on 2013/09/20 08:17:51 UTC
[jira] [Created] (CAMEL-6771) ConcurrentModificationException
thrown from inside camel splitter
Joshua Groom created CAMEL-6771:
-----------------------------------
Summary: ConcurrentModificationException thrown from inside camel splitter
Key: CAMEL-6771
URL: https://issues.apache.org/jira/browse/CAMEL-6771
Project: Camel
Issue Type: Bug
Components: camel-core
Affects Versions: 2.11.1
Environment: Amazon Linux, oracle jdk 1.7
Reporter: Joshua Groom
We use camel 2.11.1 running on the oracle 1.7 jvm for linux.
I have a route that looks like this. It reads in files and puts them on a seda queue with 8 concurrent consumers.
- The SpatialInterpolationPojo reads each file is read and split into two messages X and Y.
- The MyAggregator uses X and Y together and outputs a combined message A.B
- The MySplitterPojo splits A.B into two messages A and B
from("file://somefile")
.to("seda:filteraccept?concurrentConsumers=8");
from("seda:filteraccept?concurrentConsumers=8")
.split()
.method(new SpatialInterpolationPojo(), "split")
.to("direct:wind-aggregator");
from("direct:wind-aggregator")
.aggregate(packageCorrelationId(), new MyAggregator())
.completionPredicate(header(FIELD_AGGREGATION_COMPLETE).isNotNull())
.split()
.method(new MySplitterPojo())
.to("seda:output");
The MySplitterPojo simply returns List<Message> containing two messages that come from data in the input message body. We copy the body headers to the result messages.
It is thread safe, it has no state, ie there are no object fields that are modified.
The method is like this it is edited for clarity/privacy:
public class MySplitterPojo {
public List<Message> splitMessage(
@Headers Map<String, Object> headers,
@Body CombinedObject body) {
DefaultMessage a = new DefaultMessage();
a.setBody(body.getA());
a.setHeaders(new HashMap<String, Object>(headers));
DefaultMessage b = new DefaultMessage();
b.setBody(body.getB());
b.setHeaders(new HashMap<String, Object>(headers));
ArrayList<Message> result = new ArrayList<Message>(2);
result.add(a);
result.add(b);
return result;
}
}
When we run this route we very occasionally get the exception below. You can see that it is entirely within camel, it appears to be trying to copy the map stored under the exchange property Exchange.AGGREGATION_STRATEGY which is a camel internal property key.
By inspection of the message I can see that Exchange has just come out of the WindVectorAggregator.
This seems like it must be a camel bug to me. Any ideas?
15 Sep 2013 23:06:47,140[Camel (camel-1) thread #21 - seda://filteraccept] WARN AggregateProcessor Error processing aggregated exchange. Exchange[Message: { Trondheim, NO=WindVector [u=-5.92894983291626, v=7.060009002685547], ... }]. Caused by: [java.util.ConcurrentModificationException - null]
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextEntry(Unknown Source)
at java.util.HashMap$EntryIterator.next(Unknown Source)
at java.util.HashMap$EntryIterator.next(Unknown Source)
at java.util.HashMap.putAllForCreate(Unknown Source)
at java.util.HashMap.<init>(Unknown Source)
at org.apache.camel.processor.MulticastProcessor.setAggregationStrategyOnExchange(MulticastProcessor.java:1011)
at org.apache.camel.processor.Splitter.process(Splitter.java:95)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:86)
at org.apache.camel.processor.aggregate.AggregateProcessor$1.run(AggregateProcessor.java:495)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at org.apache.camel.util.concurrent.SynchronousExecutorService.execute(SynchronousExecutorService.java:62)
at java.util.concurrent.AbstractExecutorService.submit(Unknown Source)
at org.apache.camel.processor.aggregate.AggregateProcessor.onSubmitCompletion(AggregateProcessor.java:487)
at org.apache.camel.processor.aggregate.AggregateProcessor.onCompletion(AggregateProcessor.java:471)
at org.apache.camel.processor.aggregate.AggregateProcessor.doAggregation(AggregateProcessor.java:325)
at org.apache.camel.processor.aggregate.AggregateProcessor.process(AggregateProcessor.java:229)
at org.apache.camel.util.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:61)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122)
at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:60)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:571)
at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:504)
at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:213)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.RecipientList.sendToRecipientList(RecipientList.java:151)
at org.apache.camel.component.bean.MethodInfo$1.doProceed(MethodInfo.java:285)
at org.apache.camel.component.bean.MethodInfo$1.proceed(MethodInfo.java:251)
at org.apache.camel.component.bean.BeanProcessor.process(BeanProcessor.java:161)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:117)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:122)
at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:60)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.SendProcessor$2.doInAsyncProducer(SendProcessor.java:122)
at org.apache.camel.impl.ProducerCache.doInAsyncProducer(ProducerCache.java:298)
at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:117)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:571)
at org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:504)
at org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:213)
at org.apache.camel.processor.Splitter.process(Splitter.java:98)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.BacklogTracerInterceptor.process(BacklogTracerInterceptor.java:84)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.TraceInterceptor.process(TraceInterceptor.java:91)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.RedeliveryErrorHandler.processErrorHandler(RedeliveryErrorHandler.java:391)
at org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:273)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.interceptor.DefaultChannel.process(DefaultChannel.java:335)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:117)
at org.apache.camel.processor.Pipeline.process(Pipeline.java:80)
at org.apache.camel.processor.RouteContextProcessor.processNext(RouteContextProcessor.java:46)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.processor.UnitOfWorkProcessor.processAsync(UnitOfWorkProcessor.java:150)
at org.apache.camel.processor.UnitOfWorkProcessor.process(UnitOfWorkProcessor.java:117)
at org.apache.camel.processor.RouteInflightRepositoryProcessor.processNext(RouteInflightRepositoryProcessor.java:48)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.processor.DelegateAsyncProcessor.processNext(DelegateAsyncProcessor.java:99)
at org.apache.camel.processor.DelegateAsyncProcessor.process(DelegateAsyncProcessor.java:90)
at org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)
at org.apache.camel.util.AsyncProcessorHelper.process(AsyncProcessorHelper.java:73)
at org.apache.camel.component.seda.SedaConsumer.sendToConsumers(SedaConsumer.java:294)
at org.apache.camel.component.seda.SedaConsumer.doRun(SedaConsumer.java:203)
at org.apache.camel.component.seda.SedaConsumer.run(SedaConsumer.java:150)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira