You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by Bengt Rodehav <be...@rodehav.com> on 2014/03/14 17:10:31 UTC

sjms and multiple threads

I'm using Camel 2.12.3 and the sjms component for communicating with
Weblogic JMS.

Everything works fine when I use a single thread. However, to increase
throughput I want multiple threads to read messages from the queue.

I've done this by using the "threads()" method:

  from(sjms:...).threads(10)....

However I get an exception as follows:

java.lang.Exception: Unable to complete sending the message: Only one
thread may use a JMS Session at a time.:Only one thread may use a JMS
Session at a time.
at
org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
at
org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
at
org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
at
java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
at
java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
at java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]

What exactly does this mean and how can I consume messages via sjms using
multiple threads?

/Bengt

Re: sjms and multiple threads

Posted by Scott England-Sullivan <su...@gmail.com>.
Hey Charles,

Thanks for the test case.  I appreciate it.

Best Regards,
ses



On Mon, Mar 17, 2014 at 5:27 AM, Charles Moulliard <ch...@gmail.com> wrote:

> Commit available here :
>
> https://git-wip-us.apache.org/repos/asf?p=camel.git;a=commit;h=1a7b676e1b5c29b652e3faf16240ee3cc831a0c9
>
>
> On Mon, Mar 17, 2014 at 11:20 AM, Charles Moulliard <ch007m@gmail.com
> >wrote:
>
> > I will commit soon a unit test. That could help us to verify if something
> > goes wrong.
> >
> >
> > On Mon, Mar 17, 2014 at 9:38 AM, Bengt Rodehav <be...@rodehav.com>
> wrote:
> >
> >> An update.
> >>
> >> I noticed in the stack trace that it seems to be the *production* of
> >> messages that get the exception "Unable to complete sending the message:
> >> Only one thread may use a JMS Session at a time." - not the
> *consumption*.
> >> I also only showed you the first part of the route (cause I didn't think
> >> the rest mattered). What I do is basically reading xml messages from one
> >> queue, transforming them and the sending them to another queue.
> >>
> >> Moreover, one incoming message may end up creating more than one (in
> this
> >> case 3) out messages. I'm using a splitter to accomplish this. In pseudo
> >> code the route looks like this:
> >>
> >>
> from(sjms:...).threads(10).process(converting...).split().to(sjms:...);
> >>
> >> If I set consumerCount=10 on the consumer endpoint AND use threads(10)
> >> then
> >> I can see that more threads are created but I get the "Only one thread
> may
> >> use ..." exception. Now, if I also set the producerCount=10 on the
> >> producer
> >> endpoint then this exception goes away and my route works.
> >>
> >> Not exactly sure why this works. If anyone could explain the
> relationship
> >> between consumer/producer count and threads I would appreciate it. A
> >> theory
> >> of mine is that there must be at least as many consumers/producers as
> >> there
> >> are threads or there is a risk that two threads will try to use the same
> >> consumer/producer. If there is a one-to-one relationship between
> >> consumer/producer and JMS session then this could explain the exception.
> >> But this still sounds weird. If there is a pool of consumers/producers
> >> then
> >> the thread should try to acquire one from the pool. If no
> >> consumer/producer
> >> is available then the thread should wait until one is available - not
> try
> >> to use one that is in use by another thread.
> >>
> >> Also, although I can get my route to work this way, I get no better
> >> throughput than if I only use one thread. I get the exact same
> throughput
> >> and my CPU is basically idling. Obviously I'm not doing this right.
> >>
> >> Do I also need multiple connections to the JMS server? Could this affect
> >> the concurrency?
> >>
> >> /Bengt
> >>
> >>
> >>
> >>
> >>
> >> 2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
> >>
> >> > I've now tried just using consumerCount on the endpoint without using
> >> > threads(). However, it doesn't make it multithreaded. Only one thread
> is
> >> > started for the route.
> >> >
> >> > Any other ideas? Has someone used sjms with a multithreaded consumer?
> >> >
> >> > /Bengt
> >> >
> >> >
> >> > 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
> >> >
> >> > Ok - thanks. I have tried it but only together with threads(). I
> didn't
> >> >> realize that it might create threads on its own.
> >> >>
> >> >> /Bengt
> >> >> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <cl...@gmail.com>:
> >> >>
> >> >> Hi
> >> >>>
> >> >>> I think you should use the consumerCount option on the endpoint
> >> instead
> >> >>>
> >> >>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <be...@rodehav.com>
> >> >>> wrote:
> >> >>> > I'm using Camel 2.12.3 and the sjms component for communicating
> with
> >> >>> > Weblogic JMS.
> >> >>> >
> >> >>> > Everything works fine when I use a single thread. However, to
> >> increase
> >> >>> > throughput I want multiple threads to read messages from the
> queue.
> >> >>> >
> >> >>> > I've done this by using the "threads()" method:
> >> >>> >
> >> >>> >   from(sjms:...).threads(10)....
> >> >>> >
> >> >>> > However I get an exception as follows:
> >> >>> >
> >> >>> > java.lang.Exception: Unable to complete sending the message: Only
> >> one
> >> >>> > thread may use a JMS Session at a time.:Only one thread may use a
> >> JMS
> >> >>> > Session at a time.
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
> >> >>> > at
> >> java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
> >> >>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
> >> >>> >
> >> >>> > What exactly does this mean and how can I consume messages via
> sjms
> >> >>> using
> >> >>> > multiple threads?
> >> >>> >
> >> >>> > /Bengt
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> Claus Ibsen
> >> >>> -----------------
> >> >>> Red Hat, Inc.
> >> >>> Email: cibsen@redhat.com
> >> >>> Twitter: davsclaus
> >> >>> Blog: http://davsclaus.com
> >> >>> Author of Camel in Action: http://www.manning.com/ibsen
> >> >>> Make your Camel applications look hawt, try: http://hawt.io
> >> >>>
> >> >>
> >> >
> >>
> >
> >
> >
> > --
> > Charles Moulliard
> > Apache Committer / Architect @RedHat
> > Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
> >
> >
>
>
> --
> Charles Moulliard
> Apache Committer / Architect @RedHat
> Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
>



-- 
-- 
Scott England-Sullivan
Apache Camel Committer
Principal Consultant / Sr. Architect | Red Hat, Inc.
FuseSource is now part of Red Hat
Web:     fusesource.com <http://www.fusesource.com> |
redhat.com<http://www.redhat.com>
Blog:     sully6768.blogspot.com
Twitter: sully6768

Re: sjms and multiple threads

Posted by "Preben.Asmussen" <pr...@dr.dk>.
Bengt

If Weblogic is the bottleneck you can try and tune it using another
workmanager.

/Preben



--
View this message in context: http://camel.465427.n5.nabble.com/sjms-and-multiple-threads-tp5748836p5748982.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: sjms and multiple threads

Posted by Bengt Rodehav <be...@rodehav.com>.
A little recap of what the real problem was...

Here is a thread on Karaf user list where I discusssed this problem:

http://mail-archives.apache.org/mod_mbox/karaf-user/201403.mbox/%3CCAJ0TPGLTVuvLhaX4rJv8puqk7cirbUryaTu7=rmWzuZwza0cnA@mail.gmail.com%3E

It turned out to be a 100 ms configured delay for the servicemix bundles
implementing the spec. This will be removed in future versions of Karaf.

/Bengt




2014-03-18 14:16 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:

> Here is some feedback.
>
> It seems like Weblogic is not the bottleneck. I can write about 300
> messages per second and read about 1400 messages per second.
>
> In fact, as long as I use at least three threads, Camel does not seem to
> be the problem. It's the unmarshalling of the XML file that takes >100 ms
> when I run it in Karaf. For some reason it only takes about 10 ms when I do
> it outside Karaf (as a Junit test in Eclipse). Have no idea why since I'm
> unmarshalling the same file. In both cases I use the same Unmarshaller
> implementation (com.sun.xml.bind.v2.runtime.unmarshaller.UnmarshallerImpl).
>
> However this is not a Camel problem anymore. Thanks for your help.
>
> /Bengt
>
>
> 2014-03-17 20:33 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>
> Hi!
>>
>> I use the latest version of Camel, 2.12.3.
>>
>> /Bengt
>> Den 17 mar 2014 18:10 skrev "Scott England-Sullivan" <sully6768@gmail.com
>> >:
>>
>> Hi Bengt,
>>>
>>> What version of SJMS are you using?  In earlier versions I had code in
>>> that
>>> reused a single session for multiple consumers.  This in effect made it
>>> single threaded.  Later versions (2.13 & 2.12.3) have a patch that has
>>> removed the session restriction and uses a session per consumer.
>>>
>>>
>>> On Mon, Mar 17, 2014 at 10:34 AM, Bengt Rodehav <be...@rodehav.com>
>>> wrote:
>>>
>>> > I've done some more research. It turns out that I was mistaken
>>> regarding
>>> > the throughput. I had been watching the JMX attribute
>>> "MeanProcessingTime"
>>> > which is the same regardless of the number of threads. However the
>>> actual
>>> > time taken to process 1000 messages is 115 seconds using one thread
>>> but 38
>>> > seconds using three threads. Using more threads than three does not
>>> > increase throughput.
>>> >
>>> > I think it's interesting that I can get my throughput down by (almost)
>>> > exactly a factor of three if I use three threads.
>>> >
>>> > Another interesting fact is that for every message I read from JMS I
>>> create
>>> > exactly three messages that I write on another queue. The number three
>>> > keeps coming up...
>>> >
>>> > Since I do about 1500 reads and 4500 writes from/to Weblogic JMS per
>>> minute
>>> > I guess it is possible that Weblogic JMS is the bottleneck. It is
>>> about 100
>>> > JMS operations/second.
>>> >
>>> > /Bengt
>>> >
>>> >
>>> >
>>> > 2014-03-17 13:55 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>>> >
>>> > > Hello Charles,
>>> > >
>>> > > I looked at your unit test but I don't understand it all. It seems
>>> to use
>>> > > an inout pattern and logs the threadId which I guess is a way to see
>>> that
>>> > > different threads are used. I haven't logged that in my code yet.
>>> > >
>>> > > However, I do get multiple threads to "work" by using
>>> "consumerCount",
>>> > > "producerCount" and "threads" - all set to the same value. But I
>>> don't
>>> > get
>>> > > any better throughput which leads me to believe that somehow the
>>> > execution
>>> > > is still serialized somewhere - i e there is a bottleneck somewhere.
>>> > >
>>> > > I think it should be a fairly common scenario to want to consumer JMS
>>> > > message in a multithreaded fashion in order to increase throughput.
>>> But
>>> > > perhaps no one has done this with sjms yet.
>>> > >
>>> > > /Bengt
>>> > >
>>> > >
>>> > >
>>> > >
>>> > > 2014-03-17 11:27 GMT+01:00 Charles Moulliard <ch...@gmail.com>:
>>> > >
>>> > > Commit available here :
>>> > >>
>>> > >>
>>> >
>>> https://git-wip-us.apache.org/repos/asf?p=camel.git;a=commit;h=1a7b676e1b5c29b652e3faf16240ee3cc831a0c9
>>> > >>
>>> > >>
>>> > >> On Mon, Mar 17, 2014 at 11:20 AM, Charles Moulliard <
>>> ch007m@gmail.com
>>> > >> >wrote:
>>> > >>
>>> > >> > I will commit soon a unit test. That could help us to verify if
>>> > >> something
>>> > >> > goes wrong.
>>> > >> >
>>> > >> >
>>> > >> > On Mon, Mar 17, 2014 at 9:38 AM, Bengt Rodehav <bengt@rodehav.com
>>> >
>>> > >> wrote:
>>> > >> >
>>> > >> >> An update.
>>> > >> >>
>>> > >> >> I noticed in the stack trace that it seems to be the
>>> *production* of
>>> > >> >> messages that get the exception "Unable to complete sending the
>>> > >> message:
>>> > >> >> Only one thread may use a JMS Session at a time." - not the
>>> > >> *consumption*.
>>> > >> >> I also only showed you the first part of the route (cause I
>>> didn't
>>> > >> think
>>> > >> >> the rest mattered). What I do is basically reading xml messages
>>> from
>>> > >> one
>>> > >> >> queue, transforming them and the sending them to another queue.
>>> > >> >>
>>> > >> >> Moreover, one incoming message may end up creating more than one
>>> (in
>>> > >> this
>>> > >> >> case 3) out messages. I'm using a splitter to accomplish this. In
>>> > >> pseudo
>>> > >> >> code the route looks like this:
>>> > >> >>
>>> > >> >>
>>> > >>
>>> from(sjms:...).threads(10).process(converting...).split().to(sjms:...);
>>> > >> >>
>>> > >> >> If I set consumerCount=10 on the consumer endpoint AND use
>>> > threads(10)
>>> > >> >> then
>>> > >> >> I can see that more threads are created but I get the "Only one
>>> > thread
>>> > >> may
>>> > >> >> use ..." exception. Now, if I also set the producerCount=10 on
>>> the
>>> > >> >> producer
>>> > >> >> endpoint then this exception goes away and my route works.
>>> > >> >>
>>> > >> >> Not exactly sure why this works. If anyone could explain the
>>> > >> relationship
>>> > >> >> between consumer/producer count and threads I would appreciate
>>> it. A
>>> > >> >> theory
>>> > >> >> of mine is that there must be at least as many
>>> consumers/producers as
>>> > >> >> there
>>> > >> >> are threads or there is a risk that two threads will try to use
>>> the
>>> > >> same
>>> > >> >> consumer/producer. If there is a one-to-one relationship between
>>> > >> >> consumer/producer and JMS session then this could explain the
>>> > >> exception.
>>> > >> >> But this still sounds weird. If there is a pool of
>>> > consumers/producers
>>> > >> >> then
>>> > >> >> the thread should try to acquire one from the pool. If no
>>> > >> >> consumer/producer
>>> > >> >> is available then the thread should wait until one is available
>>> - not
>>> > >> try
>>> > >> >> to use one that is in use by another thread.
>>> > >> >>
>>> > >> >> Also, although I can get my route to work this way, I get no
>>> better
>>> > >> >> throughput than if I only use one thread. I get the exact same
>>> > >> throughput
>>> > >> >> and my CPU is basically idling. Obviously I'm not doing this
>>> right.
>>> > >> >>
>>> > >> >> Do I also need multiple connections to the JMS server? Could this
>>> > >> affect
>>> > >> >> the concurrency?
>>> > >> >>
>>> > >> >> /Bengt
>>> > >> >>
>>> > >> >>
>>> > >> >>
>>> > >> >>
>>> > >> >>
>>> > >> >> 2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>>> > >> >>
>>> > >> >> > I've now tried just using consumerCount on the endpoint without
>>> > using
>>> > >> >> > threads(). However, it doesn't make it multithreaded. Only one
>>> > >> thread is
>>> > >> >> > started for the route.
>>> > >> >> >
>>> > >> >> > Any other ideas? Has someone used sjms with a multithreaded
>>> > consumer?
>>> > >> >> >
>>> > >> >> > /Bengt
>>> > >> >> >
>>> > >> >> >
>>> > >> >> > 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>>> > >> >> >
>>> > >> >> > Ok - thanks. I have tried it but only together with threads().
>>> I
>>> > >> didn't
>>> > >> >> >> realize that it might create threads on its own.
>>> > >> >> >>
>>> > >> >> >> /Bengt
>>> > >> >> >> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <
>>> claus.ibsen@gmail.com
>>> > >:
>>> > >> >> >>
>>> > >> >> >> Hi
>>> > >> >> >>>
>>> > >> >> >>> I think you should use the consumerCount option on the
>>> endpoint
>>> > >> >> instead
>>> > >> >> >>>
>>> > >> >> >>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <
>>> > bengt@rodehav.com>
>>> > >> >> >>> wrote:
>>> > >> >> >>> > I'm using Camel 2.12.3 and the sjms component for
>>> communicating
>>> > >> with
>>> > >> >> >>> > Weblogic JMS.
>>> > >> >> >>> >
>>> > >> >> >>> > Everything works fine when I use a single thread. However,
>>> to
>>> > >> >> increase
>>> > >> >> >>> > throughput I want multiple threads to read messages from
>>> the
>>> > >> queue.
>>> > >> >> >>> >
>>> > >> >> >>> > I've done this by using the "threads()" method:
>>> > >> >> >>> >
>>> > >> >> >>> >   from(sjms:...).threads(10)....
>>> > >> >> >>> >
>>> > >> >> >>> > However I get an exception as follows:
>>> > >> >> >>> >
>>> > >> >> >>> > java.lang.Exception: Unable to complete sending the
>>> message:
>>> > Only
>>> > >> >> one
>>> > >> >> >>> > thread may use a JMS Session at a time.:Only one thread may
>>> > use a
>>> > >> >> JMS
>>> > >> >> >>> > Session at a time.
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
>>> > >> >> >>> > at
>>> > >> >>
>>> java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
>>> > >> >> >>> > at
>>> > >> >> >>> >
>>> > >> >> >>>
>>> > >> >>
>>> > >>
>>> >
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
>>> > >> >> >>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
>>> > >> >> >>> >
>>> > >> >> >>> > What exactly does this mean and how can I consume messages
>>> via
>>> > >> sjms
>>> > >> >> >>> using
>>> > >> >> >>> > multiple threads?
>>> > >> >> >>> >
>>> > >> >> >>> > /Bengt
>>> > >> >> >>>
>>> > >> >> >>>
>>> > >> >> >>>
>>> > >> >> >>> --
>>> > >> >> >>> Claus Ibsen
>>> > >> >> >>> -----------------
>>> > >> >> >>> Red Hat, Inc.
>>> > >> >> >>> Email: cibsen@redhat.com
>>> > >> >> >>> Twitter: davsclaus
>>> > >> >> >>> Blog: http://davsclaus.com
>>> > >> >> >>> Author of Camel in Action: http://www.manning.com/ibsen
>>> > >> >> >>> Make your Camel applications look hawt, try: http://hawt.io
>>> > >> >> >>>
>>> > >> >> >>
>>> > >> >> >
>>> > >> >>
>>> > >> >
>>> > >> >
>>> > >> >
>>> > >> > --
>>> > >> > Charles Moulliard
>>> > >> > Apache Committer / Architect @RedHat
>>> > >> > Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
>>> > >> >
>>> > >> >
>>> > >>
>>> > >>
>>> > >> --
>>> > >> Charles Moulliard
>>> > >> Apache Committer / Architect @RedHat
>>> > >> Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
>>> > >>
>>> > >
>>> > >
>>> >
>>>
>>>
>>>
>>> --
>>> --
>>> Scott England-Sullivan
>>> Apache Camel Committer
>>> Principal Consultant / Sr. Architect | Red Hat, Inc.
>>> FuseSource is now part of Red Hat
>>> Web:     fusesource.com <http://www.fusesource.com> |
>>> redhat.com<http://www.redhat.com>
>>> Blog:     sully6768.blogspot.com
>>> Twitter: sully6768
>>>
>>
>

Re: sjms and multiple threads

Posted by Bengt Rodehav <be...@rodehav.com>.
Here is some feedback.

It seems like Weblogic is not the bottleneck. I can write about 300
messages per second and read about 1400 messages per second.

In fact, as long as I use at least three threads, Camel does not seem to be
the problem. It's the unmarshalling of the XML file that takes >100 ms when
I run it in Karaf. For some reason it only takes about 10 ms when I do it
outside Karaf (as a Junit test in Eclipse). Have no idea why since I'm
unmarshalling the same file. In both cases I use the same Unmarshaller
implementation (com.sun.xml.bind.v2.runtime.unmarshaller.UnmarshallerImpl).

However this is not a Camel problem anymore. Thanks for your help.

/Bengt


2014-03-17 20:33 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:

> Hi!
>
> I use the latest version of Camel, 2.12.3.
>
> /Bengt
> Den 17 mar 2014 18:10 skrev "Scott England-Sullivan" <sully6768@gmail.com
> >:
>
> Hi Bengt,
>>
>> What version of SJMS are you using?  In earlier versions I had code in
>> that
>> reused a single session for multiple consumers.  This in effect made it
>> single threaded.  Later versions (2.13 & 2.12.3) have a patch that has
>> removed the session restriction and uses a session per consumer.
>>
>>
>> On Mon, Mar 17, 2014 at 10:34 AM, Bengt Rodehav <be...@rodehav.com>
>> wrote:
>>
>> > I've done some more research. It turns out that I was mistaken regarding
>> > the throughput. I had been watching the JMX attribute
>> "MeanProcessingTime"
>> > which is the same regardless of the number of threads. However the
>> actual
>> > time taken to process 1000 messages is 115 seconds using one thread but
>> 38
>> > seconds using three threads. Using more threads than three does not
>> > increase throughput.
>> >
>> > I think it's interesting that I can get my throughput down by (almost)
>> > exactly a factor of three if I use three threads.
>> >
>> > Another interesting fact is that for every message I read from JMS I
>> create
>> > exactly three messages that I write on another queue. The number three
>> > keeps coming up...
>> >
>> > Since I do about 1500 reads and 4500 writes from/to Weblogic JMS per
>> minute
>> > I guess it is possible that Weblogic JMS is the bottleneck. It is about
>> 100
>> > JMS operations/second.
>> >
>> > /Bengt
>> >
>> >
>> >
>> > 2014-03-17 13:55 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>> >
>> > > Hello Charles,
>> > >
>> > > I looked at your unit test but I don't understand it all. It seems to
>> use
>> > > an inout pattern and logs the threadId which I guess is a way to see
>> that
>> > > different threads are used. I haven't logged that in my code yet.
>> > >
>> > > However, I do get multiple threads to "work" by using "consumerCount",
>> > > "producerCount" and "threads" - all set to the same value. But I don't
>> > get
>> > > any better throughput which leads me to believe that somehow the
>> > execution
>> > > is still serialized somewhere - i e there is a bottleneck somewhere.
>> > >
>> > > I think it should be a fairly common scenario to want to consumer JMS
>> > > message in a multithreaded fashion in order to increase throughput.
>> But
>> > > perhaps no one has done this with sjms yet.
>> > >
>> > > /Bengt
>> > >
>> > >
>> > >
>> > >
>> > > 2014-03-17 11:27 GMT+01:00 Charles Moulliard <ch...@gmail.com>:
>> > >
>> > > Commit available here :
>> > >>
>> > >>
>> >
>> https://git-wip-us.apache.org/repos/asf?p=camel.git;a=commit;h=1a7b676e1b5c29b652e3faf16240ee3cc831a0c9
>> > >>
>> > >>
>> > >> On Mon, Mar 17, 2014 at 11:20 AM, Charles Moulliard <
>> ch007m@gmail.com
>> > >> >wrote:
>> > >>
>> > >> > I will commit soon a unit test. That could help us to verify if
>> > >> something
>> > >> > goes wrong.
>> > >> >
>> > >> >
>> > >> > On Mon, Mar 17, 2014 at 9:38 AM, Bengt Rodehav <be...@rodehav.com>
>> > >> wrote:
>> > >> >
>> > >> >> An update.
>> > >> >>
>> > >> >> I noticed in the stack trace that it seems to be the *production*
>> of
>> > >> >> messages that get the exception "Unable to complete sending the
>> > >> message:
>> > >> >> Only one thread may use a JMS Session at a time." - not the
>> > >> *consumption*.
>> > >> >> I also only showed you the first part of the route (cause I didn't
>> > >> think
>> > >> >> the rest mattered). What I do is basically reading xml messages
>> from
>> > >> one
>> > >> >> queue, transforming them and the sending them to another queue.
>> > >> >>
>> > >> >> Moreover, one incoming message may end up creating more than one
>> (in
>> > >> this
>> > >> >> case 3) out messages. I'm using a splitter to accomplish this. In
>> > >> pseudo
>> > >> >> code the route looks like this:
>> > >> >>
>> > >> >>
>> > >>
>> from(sjms:...).threads(10).process(converting...).split().to(sjms:...);
>> > >> >>
>> > >> >> If I set consumerCount=10 on the consumer endpoint AND use
>> > threads(10)
>> > >> >> then
>> > >> >> I can see that more threads are created but I get the "Only one
>> > thread
>> > >> may
>> > >> >> use ..." exception. Now, if I also set the producerCount=10 on the
>> > >> >> producer
>> > >> >> endpoint then this exception goes away and my route works.
>> > >> >>
>> > >> >> Not exactly sure why this works. If anyone could explain the
>> > >> relationship
>> > >> >> between consumer/producer count and threads I would appreciate
>> it. A
>> > >> >> theory
>> > >> >> of mine is that there must be at least as many
>> consumers/producers as
>> > >> >> there
>> > >> >> are threads or there is a risk that two threads will try to use
>> the
>> > >> same
>> > >> >> consumer/producer. If there is a one-to-one relationship between
>> > >> >> consumer/producer and JMS session then this could explain the
>> > >> exception.
>> > >> >> But this still sounds weird. If there is a pool of
>> > consumers/producers
>> > >> >> then
>> > >> >> the thread should try to acquire one from the pool. If no
>> > >> >> consumer/producer
>> > >> >> is available then the thread should wait until one is available -
>> not
>> > >> try
>> > >> >> to use one that is in use by another thread.
>> > >> >>
>> > >> >> Also, although I can get my route to work this way, I get no
>> better
>> > >> >> throughput than if I only use one thread. I get the exact same
>> > >> throughput
>> > >> >> and my CPU is basically idling. Obviously I'm not doing this
>> right.
>> > >> >>
>> > >> >> Do I also need multiple connections to the JMS server? Could this
>> > >> affect
>> > >> >> the concurrency?
>> > >> >>
>> > >> >> /Bengt
>> > >> >>
>> > >> >>
>> > >> >>
>> > >> >>
>> > >> >>
>> > >> >> 2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>> > >> >>
>> > >> >> > I've now tried just using consumerCount on the endpoint without
>> > using
>> > >> >> > threads(). However, it doesn't make it multithreaded. Only one
>> > >> thread is
>> > >> >> > started for the route.
>> > >> >> >
>> > >> >> > Any other ideas? Has someone used sjms with a multithreaded
>> > consumer?
>> > >> >> >
>> > >> >> > /Bengt
>> > >> >> >
>> > >> >> >
>> > >> >> > 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>> > >> >> >
>> > >> >> > Ok - thanks. I have tried it but only together with threads(). I
>> > >> didn't
>> > >> >> >> realize that it might create threads on its own.
>> > >> >> >>
>> > >> >> >> /Bengt
>> > >> >> >> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <
>> claus.ibsen@gmail.com
>> > >:
>> > >> >> >>
>> > >> >> >> Hi
>> > >> >> >>>
>> > >> >> >>> I think you should use the consumerCount option on the
>> endpoint
>> > >> >> instead
>> > >> >> >>>
>> > >> >> >>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <
>> > bengt@rodehav.com>
>> > >> >> >>> wrote:
>> > >> >> >>> > I'm using Camel 2.12.3 and the sjms component for
>> communicating
>> > >> with
>> > >> >> >>> > Weblogic JMS.
>> > >> >> >>> >
>> > >> >> >>> > Everything works fine when I use a single thread. However,
>> to
>> > >> >> increase
>> > >> >> >>> > throughput I want multiple threads to read messages from the
>> > >> queue.
>> > >> >> >>> >
>> > >> >> >>> > I've done this by using the "threads()" method:
>> > >> >> >>> >
>> > >> >> >>> >   from(sjms:...).threads(10)....
>> > >> >> >>> >
>> > >> >> >>> > However I get an exception as follows:
>> > >> >> >>> >
>> > >> >> >>> > java.lang.Exception: Unable to complete sending the message:
>> > Only
>> > >> >> one
>> > >> >> >>> > thread may use a JMS Session at a time.:Only one thread may
>> > use a
>> > >> >> JMS
>> > >> >> >>> > Session at a time.
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
>> > >> >> >>> > at
>> > >> >>
>> java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
>> > >> >> >>> > at
>> > >> >> >>> >
>> > >> >> >>>
>> > >> >>
>> > >>
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
>> > >> >> >>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
>> > >> >> >>> >
>> > >> >> >>> > What exactly does this mean and how can I consume messages
>> via
>> > >> sjms
>> > >> >> >>> using
>> > >> >> >>> > multiple threads?
>> > >> >> >>> >
>> > >> >> >>> > /Bengt
>> > >> >> >>>
>> > >> >> >>>
>> > >> >> >>>
>> > >> >> >>> --
>> > >> >> >>> Claus Ibsen
>> > >> >> >>> -----------------
>> > >> >> >>> Red Hat, Inc.
>> > >> >> >>> Email: cibsen@redhat.com
>> > >> >> >>> Twitter: davsclaus
>> > >> >> >>> Blog: http://davsclaus.com
>> > >> >> >>> Author of Camel in Action: http://www.manning.com/ibsen
>> > >> >> >>> Make your Camel applications look hawt, try: http://hawt.io
>> > >> >> >>>
>> > >> >> >>
>> > >> >> >
>> > >> >>
>> > >> >
>> > >> >
>> > >> >
>> > >> > --
>> > >> > Charles Moulliard
>> > >> > Apache Committer / Architect @RedHat
>> > >> > Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
>> > >> >
>> > >> >
>> > >>
>> > >>
>> > >> --
>> > >> Charles Moulliard
>> > >> Apache Committer / Architect @RedHat
>> > >> Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
>> > >>
>> > >
>> > >
>> >
>>
>>
>>
>> --
>> --
>> Scott England-Sullivan
>> Apache Camel Committer
>> Principal Consultant / Sr. Architect | Red Hat, Inc.
>> FuseSource is now part of Red Hat
>> Web:     fusesource.com <http://www.fusesource.com> |
>> redhat.com<http://www.redhat.com>
>> Blog:     sully6768.blogspot.com
>> Twitter: sully6768
>>
>

Re: sjms and multiple threads

Posted by Bengt Rodehav <be...@rodehav.com>.
Hi!

I use the latest version of Camel, 2.12.3.

/Bengt
Den 17 mar 2014 18:10 skrev "Scott England-Sullivan" <su...@gmail.com>:

> Hi Bengt,
>
> What version of SJMS are you using?  In earlier versions I had code in that
> reused a single session for multiple consumers.  This in effect made it
> single threaded.  Later versions (2.13 & 2.12.3) have a patch that has
> removed the session restriction and uses a session per consumer.
>
>
> On Mon, Mar 17, 2014 at 10:34 AM, Bengt Rodehav <be...@rodehav.com> wrote:
>
> > I've done some more research. It turns out that I was mistaken regarding
> > the throughput. I had been watching the JMX attribute
> "MeanProcessingTime"
> > which is the same regardless of the number of threads. However the actual
> > time taken to process 1000 messages is 115 seconds using one thread but
> 38
> > seconds using three threads. Using more threads than three does not
> > increase throughput.
> >
> > I think it's interesting that I can get my throughput down by (almost)
> > exactly a factor of three if I use three threads.
> >
> > Another interesting fact is that for every message I read from JMS I
> create
> > exactly three messages that I write on another queue. The number three
> > keeps coming up...
> >
> > Since I do about 1500 reads and 4500 writes from/to Weblogic JMS per
> minute
> > I guess it is possible that Weblogic JMS is the bottleneck. It is about
> 100
> > JMS operations/second.
> >
> > /Bengt
> >
> >
> >
> > 2014-03-17 13:55 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
> >
> > > Hello Charles,
> > >
> > > I looked at your unit test but I don't understand it all. It seems to
> use
> > > an inout pattern and logs the threadId which I guess is a way to see
> that
> > > different threads are used. I haven't logged that in my code yet.
> > >
> > > However, I do get multiple threads to "work" by using "consumerCount",
> > > "producerCount" and "threads" - all set to the same value. But I don't
> > get
> > > any better throughput which leads me to believe that somehow the
> > execution
> > > is still serialized somewhere - i e there is a bottleneck somewhere.
> > >
> > > I think it should be a fairly common scenario to want to consumer JMS
> > > message in a multithreaded fashion in order to increase throughput. But
> > > perhaps no one has done this with sjms yet.
> > >
> > > /Bengt
> > >
> > >
> > >
> > >
> > > 2014-03-17 11:27 GMT+01:00 Charles Moulliard <ch...@gmail.com>:
> > >
> > > Commit available here :
> > >>
> > >>
> >
> https://git-wip-us.apache.org/repos/asf?p=camel.git;a=commit;h=1a7b676e1b5c29b652e3faf16240ee3cc831a0c9
> > >>
> > >>
> > >> On Mon, Mar 17, 2014 at 11:20 AM, Charles Moulliard <ch007m@gmail.com
> > >> >wrote:
> > >>
> > >> > I will commit soon a unit test. That could help us to verify if
> > >> something
> > >> > goes wrong.
> > >> >
> > >> >
> > >> > On Mon, Mar 17, 2014 at 9:38 AM, Bengt Rodehav <be...@rodehav.com>
> > >> wrote:
> > >> >
> > >> >> An update.
> > >> >>
> > >> >> I noticed in the stack trace that it seems to be the *production*
> of
> > >> >> messages that get the exception "Unable to complete sending the
> > >> message:
> > >> >> Only one thread may use a JMS Session at a time." - not the
> > >> *consumption*.
> > >> >> I also only showed you the first part of the route (cause I didn't
> > >> think
> > >> >> the rest mattered). What I do is basically reading xml messages
> from
> > >> one
> > >> >> queue, transforming them and the sending them to another queue.
> > >> >>
> > >> >> Moreover, one incoming message may end up creating more than one
> (in
> > >> this
> > >> >> case 3) out messages. I'm using a splitter to accomplish this. In
> > >> pseudo
> > >> >> code the route looks like this:
> > >> >>
> > >> >>
> > >>
> from(sjms:...).threads(10).process(converting...).split().to(sjms:...);
> > >> >>
> > >> >> If I set consumerCount=10 on the consumer endpoint AND use
> > threads(10)
> > >> >> then
> > >> >> I can see that more threads are created but I get the "Only one
> > thread
> > >> may
> > >> >> use ..." exception. Now, if I also set the producerCount=10 on the
> > >> >> producer
> > >> >> endpoint then this exception goes away and my route works.
> > >> >>
> > >> >> Not exactly sure why this works. If anyone could explain the
> > >> relationship
> > >> >> between consumer/producer count and threads I would appreciate it.
> A
> > >> >> theory
> > >> >> of mine is that there must be at least as many consumers/producers
> as
> > >> >> there
> > >> >> are threads or there is a risk that two threads will try to use the
> > >> same
> > >> >> consumer/producer. If there is a one-to-one relationship between
> > >> >> consumer/producer and JMS session then this could explain the
> > >> exception.
> > >> >> But this still sounds weird. If there is a pool of
> > consumers/producers
> > >> >> then
> > >> >> the thread should try to acquire one from the pool. If no
> > >> >> consumer/producer
> > >> >> is available then the thread should wait until one is available -
> not
> > >> try
> > >> >> to use one that is in use by another thread.
> > >> >>
> > >> >> Also, although I can get my route to work this way, I get no better
> > >> >> throughput than if I only use one thread. I get the exact same
> > >> throughput
> > >> >> and my CPU is basically idling. Obviously I'm not doing this right.
> > >> >>
> > >> >> Do I also need multiple connections to the JMS server? Could this
> > >> affect
> > >> >> the concurrency?
> > >> >>
> > >> >> /Bengt
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >>
> > >> >> 2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
> > >> >>
> > >> >> > I've now tried just using consumerCount on the endpoint without
> > using
> > >> >> > threads(). However, it doesn't make it multithreaded. Only one
> > >> thread is
> > >> >> > started for the route.
> > >> >> >
> > >> >> > Any other ideas? Has someone used sjms with a multithreaded
> > consumer?
> > >> >> >
> > >> >> > /Bengt
> > >> >> >
> > >> >> >
> > >> >> > 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
> > >> >> >
> > >> >> > Ok - thanks. I have tried it but only together with threads(). I
> > >> didn't
> > >> >> >> realize that it might create threads on its own.
> > >> >> >>
> > >> >> >> /Bengt
> > >> >> >> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <
> claus.ibsen@gmail.com
> > >:
> > >> >> >>
> > >> >> >> Hi
> > >> >> >>>
> > >> >> >>> I think you should use the consumerCount option on the endpoint
> > >> >> instead
> > >> >> >>>
> > >> >> >>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <
> > bengt@rodehav.com>
> > >> >> >>> wrote:
> > >> >> >>> > I'm using Camel 2.12.3 and the sjms component for
> communicating
> > >> with
> > >> >> >>> > Weblogic JMS.
> > >> >> >>> >
> > >> >> >>> > Everything works fine when I use a single thread. However, to
> > >> >> increase
> > >> >> >>> > throughput I want multiple threads to read messages from the
> > >> queue.
> > >> >> >>> >
> > >> >> >>> > I've done this by using the "threads()" method:
> > >> >> >>> >
> > >> >> >>> >   from(sjms:...).threads(10)....
> > >> >> >>> >
> > >> >> >>> > However I get an exception as follows:
> > >> >> >>> >
> > >> >> >>> > java.lang.Exception: Unable to complete sending the message:
> > Only
> > >> >> one
> > >> >> >>> > thread may use a JMS Session at a time.:Only one thread may
> > use a
> > >> >> JMS
> > >> >> >>> > Session at a time.
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
> > >> >> >>> > at
> > >> >> java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
> > >> >> >>> > at
> > >> >> >>> >
> > >> >> >>>
> > >> >>
> > >>
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
> > >> >> >>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
> > >> >> >>> >
> > >> >> >>> > What exactly does this mean and how can I consume messages
> via
> > >> sjms
> > >> >> >>> using
> > >> >> >>> > multiple threads?
> > >> >> >>> >
> > >> >> >>> > /Bengt
> > >> >> >>>
> > >> >> >>>
> > >> >> >>>
> > >> >> >>> --
> > >> >> >>> Claus Ibsen
> > >> >> >>> -----------------
> > >> >> >>> Red Hat, Inc.
> > >> >> >>> Email: cibsen@redhat.com
> > >> >> >>> Twitter: davsclaus
> > >> >> >>> Blog: http://davsclaus.com
> > >> >> >>> Author of Camel in Action: http://www.manning.com/ibsen
> > >> >> >>> Make your Camel applications look hawt, try: http://hawt.io
> > >> >> >>>
> > >> >> >>
> > >> >> >
> > >> >>
> > >> >
> > >> >
> > >> >
> > >> > --
> > >> > Charles Moulliard
> > >> > Apache Committer / Architect @RedHat
> > >> > Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
> > >> >
> > >> >
> > >>
> > >>
> > >> --
> > >> Charles Moulliard
> > >> Apache Committer / Architect @RedHat
> > >> Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
> > >>
> > >
> > >
> >
>
>
>
> --
> --
> Scott England-Sullivan
> Apache Camel Committer
> Principal Consultant / Sr. Architect | Red Hat, Inc.
> FuseSource is now part of Red Hat
> Web:     fusesource.com <http://www.fusesource.com> |
> redhat.com<http://www.redhat.com>
> Blog:     sully6768.blogspot.com
> Twitter: sully6768
>

Re: sjms and multiple threads

Posted by Scott England-Sullivan <su...@gmail.com>.
Hi Bengt,

What version of SJMS are you using?  In earlier versions I had code in that
reused a single session for multiple consumers.  This in effect made it
single threaded.  Later versions (2.13 & 2.12.3) have a patch that has
removed the session restriction and uses a session per consumer.


On Mon, Mar 17, 2014 at 10:34 AM, Bengt Rodehav <be...@rodehav.com> wrote:

> I've done some more research. It turns out that I was mistaken regarding
> the throughput. I had been watching the JMX attribute "MeanProcessingTime"
> which is the same regardless of the number of threads. However the actual
> time taken to process 1000 messages is 115 seconds using one thread but 38
> seconds using three threads. Using more threads than three does not
> increase throughput.
>
> I think it's interesting that I can get my throughput down by (almost)
> exactly a factor of three if I use three threads.
>
> Another interesting fact is that for every message I read from JMS I create
> exactly three messages that I write on another queue. The number three
> keeps coming up...
>
> Since I do about 1500 reads and 4500 writes from/to Weblogic JMS per minute
> I guess it is possible that Weblogic JMS is the bottleneck. It is about 100
> JMS operations/second.
>
> /Bengt
>
>
>
> 2014-03-17 13:55 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>
> > Hello Charles,
> >
> > I looked at your unit test but I don't understand it all. It seems to use
> > an inout pattern and logs the threadId which I guess is a way to see that
> > different threads are used. I haven't logged that in my code yet.
> >
> > However, I do get multiple threads to "work" by using "consumerCount",
> > "producerCount" and "threads" - all set to the same value. But I don't
> get
> > any better throughput which leads me to believe that somehow the
> execution
> > is still serialized somewhere - i e there is a bottleneck somewhere.
> >
> > I think it should be a fairly common scenario to want to consumer JMS
> > message in a multithreaded fashion in order to increase throughput. But
> > perhaps no one has done this with sjms yet.
> >
> > /Bengt
> >
> >
> >
> >
> > 2014-03-17 11:27 GMT+01:00 Charles Moulliard <ch...@gmail.com>:
> >
> > Commit available here :
> >>
> >>
> https://git-wip-us.apache.org/repos/asf?p=camel.git;a=commit;h=1a7b676e1b5c29b652e3faf16240ee3cc831a0c9
> >>
> >>
> >> On Mon, Mar 17, 2014 at 11:20 AM, Charles Moulliard <ch007m@gmail.com
> >> >wrote:
> >>
> >> > I will commit soon a unit test. That could help us to verify if
> >> something
> >> > goes wrong.
> >> >
> >> >
> >> > On Mon, Mar 17, 2014 at 9:38 AM, Bengt Rodehav <be...@rodehav.com>
> >> wrote:
> >> >
> >> >> An update.
> >> >>
> >> >> I noticed in the stack trace that it seems to be the *production* of
> >> >> messages that get the exception "Unable to complete sending the
> >> message:
> >> >> Only one thread may use a JMS Session at a time." - not the
> >> *consumption*.
> >> >> I also only showed you the first part of the route (cause I didn't
> >> think
> >> >> the rest mattered). What I do is basically reading xml messages from
> >> one
> >> >> queue, transforming them and the sending them to another queue.
> >> >>
> >> >> Moreover, one incoming message may end up creating more than one (in
> >> this
> >> >> case 3) out messages. I'm using a splitter to accomplish this. In
> >> pseudo
> >> >> code the route looks like this:
> >> >>
> >> >>
> >> from(sjms:...).threads(10).process(converting...).split().to(sjms:...);
> >> >>
> >> >> If I set consumerCount=10 on the consumer endpoint AND use
> threads(10)
> >> >> then
> >> >> I can see that more threads are created but I get the "Only one
> thread
> >> may
> >> >> use ..." exception. Now, if I also set the producerCount=10 on the
> >> >> producer
> >> >> endpoint then this exception goes away and my route works.
> >> >>
> >> >> Not exactly sure why this works. If anyone could explain the
> >> relationship
> >> >> between consumer/producer count and threads I would appreciate it. A
> >> >> theory
> >> >> of mine is that there must be at least as many consumers/producers as
> >> >> there
> >> >> are threads or there is a risk that two threads will try to use the
> >> same
> >> >> consumer/producer. If there is a one-to-one relationship between
> >> >> consumer/producer and JMS session then this could explain the
> >> exception.
> >> >> But this still sounds weird. If there is a pool of
> consumers/producers
> >> >> then
> >> >> the thread should try to acquire one from the pool. If no
> >> >> consumer/producer
> >> >> is available then the thread should wait until one is available - not
> >> try
> >> >> to use one that is in use by another thread.
> >> >>
> >> >> Also, although I can get my route to work this way, I get no better
> >> >> throughput than if I only use one thread. I get the exact same
> >> throughput
> >> >> and my CPU is basically idling. Obviously I'm not doing this right.
> >> >>
> >> >> Do I also need multiple connections to the JMS server? Could this
> >> affect
> >> >> the concurrency?
> >> >>
> >> >> /Bengt
> >> >>
> >> >>
> >> >>
> >> >>
> >> >>
> >> >> 2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
> >> >>
> >> >> > I've now tried just using consumerCount on the endpoint without
> using
> >> >> > threads(). However, it doesn't make it multithreaded. Only one
> >> thread is
> >> >> > started for the route.
> >> >> >
> >> >> > Any other ideas? Has someone used sjms with a multithreaded
> consumer?
> >> >> >
> >> >> > /Bengt
> >> >> >
> >> >> >
> >> >> > 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
> >> >> >
> >> >> > Ok - thanks. I have tried it but only together with threads(). I
> >> didn't
> >> >> >> realize that it might create threads on its own.
> >> >> >>
> >> >> >> /Bengt
> >> >> >> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <claus.ibsen@gmail.com
> >:
> >> >> >>
> >> >> >> Hi
> >> >> >>>
> >> >> >>> I think you should use the consumerCount option on the endpoint
> >> >> instead
> >> >> >>>
> >> >> >>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <
> bengt@rodehav.com>
> >> >> >>> wrote:
> >> >> >>> > I'm using Camel 2.12.3 and the sjms component for communicating
> >> with
> >> >> >>> > Weblogic JMS.
> >> >> >>> >
> >> >> >>> > Everything works fine when I use a single thread. However, to
> >> >> increase
> >> >> >>> > throughput I want multiple threads to read messages from the
> >> queue.
> >> >> >>> >
> >> >> >>> > I've done this by using the "threads()" method:
> >> >> >>> >
> >> >> >>> >   from(sjms:...).threads(10)....
> >> >> >>> >
> >> >> >>> > However I get an exception as follows:
> >> >> >>> >
> >> >> >>> > java.lang.Exception: Unable to complete sending the message:
> Only
> >> >> one
> >> >> >>> > thread may use a JMS Session at a time.:Only one thread may
> use a
> >> >> JMS
> >> >> >>> > Session at a time.
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
> >> >> >>> > at
> >> >> java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
> >> >> >>> > at
> >> >> >>> >
> >> >> >>>
> >> >>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
> >> >> >>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
> >> >> >>> >
> >> >> >>> > What exactly does this mean and how can I consume messages via
> >> sjms
> >> >> >>> using
> >> >> >>> > multiple threads?
> >> >> >>> >
> >> >> >>> > /Bengt
> >> >> >>>
> >> >> >>>
> >> >> >>>
> >> >> >>> --
> >> >> >>> Claus Ibsen
> >> >> >>> -----------------
> >> >> >>> Red Hat, Inc.
> >> >> >>> Email: cibsen@redhat.com
> >> >> >>> Twitter: davsclaus
> >> >> >>> Blog: http://davsclaus.com
> >> >> >>> Author of Camel in Action: http://www.manning.com/ibsen
> >> >> >>> Make your Camel applications look hawt, try: http://hawt.io
> >> >> >>>
> >> >> >>
> >> >> >
> >> >>
> >> >
> >> >
> >> >
> >> > --
> >> > Charles Moulliard
> >> > Apache Committer / Architect @RedHat
> >> > Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
> >> >
> >> >
> >>
> >>
> >> --
> >> Charles Moulliard
> >> Apache Committer / Architect @RedHat
> >> Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
> >>
> >
> >
>



-- 
-- 
Scott England-Sullivan
Apache Camel Committer
Principal Consultant / Sr. Architect | Red Hat, Inc.
FuseSource is now part of Red Hat
Web:     fusesource.com <http://www.fusesource.com> |
redhat.com<http://www.redhat.com>
Blog:     sully6768.blogspot.com
Twitter: sully6768

Re: sjms and multiple threads

Posted by Bengt Rodehav <be...@rodehav.com>.
I've done some more research. It turns out that I was mistaken regarding
the throughput. I had been watching the JMX attribute "MeanProcessingTime"
which is the same regardless of the number of threads. However the actual
time taken to process 1000 messages is 115 seconds using one thread but 38
seconds using three threads. Using more threads than three does not
increase throughput.

I think it's interesting that I can get my throughput down by (almost)
exactly a factor of three if I use three threads.

Another interesting fact is that for every message I read from JMS I create
exactly three messages that I write on another queue. The number three
keeps coming up...

Since I do about 1500 reads and 4500 writes from/to Weblogic JMS per minute
I guess it is possible that Weblogic JMS is the bottleneck. It is about 100
JMS operations/second.

/Bengt



2014-03-17 13:55 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:

> Hello Charles,
>
> I looked at your unit test but I don't understand it all. It seems to use
> an inout pattern and logs the threadId which I guess is a way to see that
> different threads are used. I haven't logged that in my code yet.
>
> However, I do get multiple threads to "work" by using "consumerCount",
> "producerCount" and "threads" - all set to the same value. But I don't get
> any better throughput which leads me to believe that somehow the execution
> is still serialized somewhere - i e there is a bottleneck somewhere.
>
> I think it should be a fairly common scenario to want to consumer JMS
> message in a multithreaded fashion in order to increase throughput. But
> perhaps no one has done this with sjms yet.
>
> /Bengt
>
>
>
>
> 2014-03-17 11:27 GMT+01:00 Charles Moulliard <ch...@gmail.com>:
>
> Commit available here :
>>
>> https://git-wip-us.apache.org/repos/asf?p=camel.git;a=commit;h=1a7b676e1b5c29b652e3faf16240ee3cc831a0c9
>>
>>
>> On Mon, Mar 17, 2014 at 11:20 AM, Charles Moulliard <ch007m@gmail.com
>> >wrote:
>>
>> > I will commit soon a unit test. That could help us to verify if
>> something
>> > goes wrong.
>> >
>> >
>> > On Mon, Mar 17, 2014 at 9:38 AM, Bengt Rodehav <be...@rodehav.com>
>> wrote:
>> >
>> >> An update.
>> >>
>> >> I noticed in the stack trace that it seems to be the *production* of
>> >> messages that get the exception "Unable to complete sending the
>> message:
>> >> Only one thread may use a JMS Session at a time." - not the
>> *consumption*.
>> >> I also only showed you the first part of the route (cause I didn't
>> think
>> >> the rest mattered). What I do is basically reading xml messages from
>> one
>> >> queue, transforming them and the sending them to another queue.
>> >>
>> >> Moreover, one incoming message may end up creating more than one (in
>> this
>> >> case 3) out messages. I'm using a splitter to accomplish this. In
>> pseudo
>> >> code the route looks like this:
>> >>
>> >>
>> from(sjms:...).threads(10).process(converting...).split().to(sjms:...);
>> >>
>> >> If I set consumerCount=10 on the consumer endpoint AND use threads(10)
>> >> then
>> >> I can see that more threads are created but I get the "Only one thread
>> may
>> >> use ..." exception. Now, if I also set the producerCount=10 on the
>> >> producer
>> >> endpoint then this exception goes away and my route works.
>> >>
>> >> Not exactly sure why this works. If anyone could explain the
>> relationship
>> >> between consumer/producer count and threads I would appreciate it. A
>> >> theory
>> >> of mine is that there must be at least as many consumers/producers as
>> >> there
>> >> are threads or there is a risk that two threads will try to use the
>> same
>> >> consumer/producer. If there is a one-to-one relationship between
>> >> consumer/producer and JMS session then this could explain the
>> exception.
>> >> But this still sounds weird. If there is a pool of consumers/producers
>> >> then
>> >> the thread should try to acquire one from the pool. If no
>> >> consumer/producer
>> >> is available then the thread should wait until one is available - not
>> try
>> >> to use one that is in use by another thread.
>> >>
>> >> Also, although I can get my route to work this way, I get no better
>> >> throughput than if I only use one thread. I get the exact same
>> throughput
>> >> and my CPU is basically idling. Obviously I'm not doing this right.
>> >>
>> >> Do I also need multiple connections to the JMS server? Could this
>> affect
>> >> the concurrency?
>> >>
>> >> /Bengt
>> >>
>> >>
>> >>
>> >>
>> >>
>> >> 2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>> >>
>> >> > I've now tried just using consumerCount on the endpoint without using
>> >> > threads(). However, it doesn't make it multithreaded. Only one
>> thread is
>> >> > started for the route.
>> >> >
>> >> > Any other ideas? Has someone used sjms with a multithreaded consumer?
>> >> >
>> >> > /Bengt
>> >> >
>> >> >
>> >> > 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>> >> >
>> >> > Ok - thanks. I have tried it but only together with threads(). I
>> didn't
>> >> >> realize that it might create threads on its own.
>> >> >>
>> >> >> /Bengt
>> >> >> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <cl...@gmail.com>:
>> >> >>
>> >> >> Hi
>> >> >>>
>> >> >>> I think you should use the consumerCount option on the endpoint
>> >> instead
>> >> >>>
>> >> >>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <be...@rodehav.com>
>> >> >>> wrote:
>> >> >>> > I'm using Camel 2.12.3 and the sjms component for communicating
>> with
>> >> >>> > Weblogic JMS.
>> >> >>> >
>> >> >>> > Everything works fine when I use a single thread. However, to
>> >> increase
>> >> >>> > throughput I want multiple threads to read messages from the
>> queue.
>> >> >>> >
>> >> >>> > I've done this by using the "threads()" method:
>> >> >>> >
>> >> >>> >   from(sjms:...).threads(10)....
>> >> >>> >
>> >> >>> > However I get an exception as follows:
>> >> >>> >
>> >> >>> > java.lang.Exception: Unable to complete sending the message: Only
>> >> one
>> >> >>> > thread may use a JMS Session at a time.:Only one thread may use a
>> >> JMS
>> >> >>> > Session at a time.
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
>> >> >>> > at
>> >> java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
>> >> >>> > at
>> >> >>> >
>> >> >>>
>> >>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
>> >> >>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
>> >> >>> >
>> >> >>> > What exactly does this mean and how can I consume messages via
>> sjms
>> >> >>> using
>> >> >>> > multiple threads?
>> >> >>> >
>> >> >>> > /Bengt
>> >> >>>
>> >> >>>
>> >> >>>
>> >> >>> --
>> >> >>> Claus Ibsen
>> >> >>> -----------------
>> >> >>> Red Hat, Inc.
>> >> >>> Email: cibsen@redhat.com
>> >> >>> Twitter: davsclaus
>> >> >>> Blog: http://davsclaus.com
>> >> >>> Author of Camel in Action: http://www.manning.com/ibsen
>> >> >>> Make your Camel applications look hawt, try: http://hawt.io
>> >> >>>
>> >> >>
>> >> >
>> >>
>> >
>> >
>> >
>> > --
>> > Charles Moulliard
>> > Apache Committer / Architect @RedHat
>> > Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
>> >
>> >
>>
>>
>> --
>> Charles Moulliard
>> Apache Committer / Architect @RedHat
>> Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
>>
>
>

Re: sjms and multiple threads

Posted by Bengt Rodehav <be...@rodehav.com>.
Hello Charles,

I looked at your unit test but I don't understand it all. It seems to use
an inout pattern and logs the threadId which I guess is a way to see that
different threads are used. I haven't logged that in my code yet.

However, I do get multiple threads to "work" by using "consumerCount",
"producerCount" and "threads" - all set to the same value. But I don't get
any better throughput which leads me to believe that somehow the execution
is still serialized somewhere - i e there is a bottleneck somewhere.

I think it should be a fairly common scenario to want to consumer JMS
message in a multithreaded fashion in order to increase throughput. But
perhaps no one has done this with sjms yet.

/Bengt




2014-03-17 11:27 GMT+01:00 Charles Moulliard <ch...@gmail.com>:

> Commit available here :
>
> https://git-wip-us.apache.org/repos/asf?p=camel.git;a=commit;h=1a7b676e1b5c29b652e3faf16240ee3cc831a0c9
>
>
> On Mon, Mar 17, 2014 at 11:20 AM, Charles Moulliard <ch007m@gmail.com
> >wrote:
>
> > I will commit soon a unit test. That could help us to verify if something
> > goes wrong.
> >
> >
> > On Mon, Mar 17, 2014 at 9:38 AM, Bengt Rodehav <be...@rodehav.com>
> wrote:
> >
> >> An update.
> >>
> >> I noticed in the stack trace that it seems to be the *production* of
> >> messages that get the exception "Unable to complete sending the message:
> >> Only one thread may use a JMS Session at a time." - not the
> *consumption*.
> >> I also only showed you the first part of the route (cause I didn't think
> >> the rest mattered). What I do is basically reading xml messages from one
> >> queue, transforming them and the sending them to another queue.
> >>
> >> Moreover, one incoming message may end up creating more than one (in
> this
> >> case 3) out messages. I'm using a splitter to accomplish this. In pseudo
> >> code the route looks like this:
> >>
> >>
> from(sjms:...).threads(10).process(converting...).split().to(sjms:...);
> >>
> >> If I set consumerCount=10 on the consumer endpoint AND use threads(10)
> >> then
> >> I can see that more threads are created but I get the "Only one thread
> may
> >> use ..." exception. Now, if I also set the producerCount=10 on the
> >> producer
> >> endpoint then this exception goes away and my route works.
> >>
> >> Not exactly sure why this works. If anyone could explain the
> relationship
> >> between consumer/producer count and threads I would appreciate it. A
> >> theory
> >> of mine is that there must be at least as many consumers/producers as
> >> there
> >> are threads or there is a risk that two threads will try to use the same
> >> consumer/producer. If there is a one-to-one relationship between
> >> consumer/producer and JMS session then this could explain the exception.
> >> But this still sounds weird. If there is a pool of consumers/producers
> >> then
> >> the thread should try to acquire one from the pool. If no
> >> consumer/producer
> >> is available then the thread should wait until one is available - not
> try
> >> to use one that is in use by another thread.
> >>
> >> Also, although I can get my route to work this way, I get no better
> >> throughput than if I only use one thread. I get the exact same
> throughput
> >> and my CPU is basically idling. Obviously I'm not doing this right.
> >>
> >> Do I also need multiple connections to the JMS server? Could this affect
> >> the concurrency?
> >>
> >> /Bengt
> >>
> >>
> >>
> >>
> >>
> >> 2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
> >>
> >> > I've now tried just using consumerCount on the endpoint without using
> >> > threads(). However, it doesn't make it multithreaded. Only one thread
> is
> >> > started for the route.
> >> >
> >> > Any other ideas? Has someone used sjms with a multithreaded consumer?
> >> >
> >> > /Bengt
> >> >
> >> >
> >> > 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
> >> >
> >> > Ok - thanks. I have tried it but only together with threads(). I
> didn't
> >> >> realize that it might create threads on its own.
> >> >>
> >> >> /Bengt
> >> >> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <cl...@gmail.com>:
> >> >>
> >> >> Hi
> >> >>>
> >> >>> I think you should use the consumerCount option on the endpoint
> >> instead
> >> >>>
> >> >>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <be...@rodehav.com>
> >> >>> wrote:
> >> >>> > I'm using Camel 2.12.3 and the sjms component for communicating
> with
> >> >>> > Weblogic JMS.
> >> >>> >
> >> >>> > Everything works fine when I use a single thread. However, to
> >> increase
> >> >>> > throughput I want multiple threads to read messages from the
> queue.
> >> >>> >
> >> >>> > I've done this by using the "threads()" method:
> >> >>> >
> >> >>> >   from(sjms:...).threads(10)....
> >> >>> >
> >> >>> > However I get an exception as follows:
> >> >>> >
> >> >>> > java.lang.Exception: Unable to complete sending the message: Only
> >> one
> >> >>> > thread may use a JMS Session at a time.:Only one thread may use a
> >> JMS
> >> >>> > Session at a time.
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
> >> >>> > at
> >> java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
> >> >>> > at
> >> >>> >
> >> >>>
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
> >> >>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
> >> >>> >
> >> >>> > What exactly does this mean and how can I consume messages via
> sjms
> >> >>> using
> >> >>> > multiple threads?
> >> >>> >
> >> >>> > /Bengt
> >> >>>
> >> >>>
> >> >>>
> >> >>> --
> >> >>> Claus Ibsen
> >> >>> -----------------
> >> >>> Red Hat, Inc.
> >> >>> Email: cibsen@redhat.com
> >> >>> Twitter: davsclaus
> >> >>> Blog: http://davsclaus.com
> >> >>> Author of Camel in Action: http://www.manning.com/ibsen
> >> >>> Make your Camel applications look hawt, try: http://hawt.io
> >> >>>
> >> >>
> >> >
> >>
> >
> >
> >
> > --
> > Charles Moulliard
> > Apache Committer / Architect @RedHat
> > Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
> >
> >
>
>
> --
> Charles Moulliard
> Apache Committer / Architect @RedHat
> Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
>

Re: sjms and multiple threads

Posted by Charles Moulliard <ch...@gmail.com>.
Commit available here :
https://git-wip-us.apache.org/repos/asf?p=camel.git;a=commit;h=1a7b676e1b5c29b652e3faf16240ee3cc831a0c9


On Mon, Mar 17, 2014 at 11:20 AM, Charles Moulliard <ch...@gmail.com>wrote:

> I will commit soon a unit test. That could help us to verify if something
> goes wrong.
>
>
> On Mon, Mar 17, 2014 at 9:38 AM, Bengt Rodehav <be...@rodehav.com> wrote:
>
>> An update.
>>
>> I noticed in the stack trace that it seems to be the *production* of
>> messages that get the exception "Unable to complete sending the message:
>> Only one thread may use a JMS Session at a time." - not the *consumption*.
>> I also only showed you the first part of the route (cause I didn't think
>> the rest mattered). What I do is basically reading xml messages from one
>> queue, transforming them and the sending them to another queue.
>>
>> Moreover, one incoming message may end up creating more than one (in this
>> case 3) out messages. I'm using a splitter to accomplish this. In pseudo
>> code the route looks like this:
>>
>>   from(sjms:...).threads(10).process(converting...).split().to(sjms:...);
>>
>> If I set consumerCount=10 on the consumer endpoint AND use threads(10)
>> then
>> I can see that more threads are created but I get the "Only one thread may
>> use ..." exception. Now, if I also set the producerCount=10 on the
>> producer
>> endpoint then this exception goes away and my route works.
>>
>> Not exactly sure why this works. If anyone could explain the relationship
>> between consumer/producer count and threads I would appreciate it. A
>> theory
>> of mine is that there must be at least as many consumers/producers as
>> there
>> are threads or there is a risk that two threads will try to use the same
>> consumer/producer. If there is a one-to-one relationship between
>> consumer/producer and JMS session then this could explain the exception.
>> But this still sounds weird. If there is a pool of consumers/producers
>> then
>> the thread should try to acquire one from the pool. If no
>> consumer/producer
>> is available then the thread should wait until one is available - not try
>> to use one that is in use by another thread.
>>
>> Also, although I can get my route to work this way, I get no better
>> throughput than if I only use one thread. I get the exact same throughput
>> and my CPU is basically idling. Obviously I'm not doing this right.
>>
>> Do I also need multiple connections to the JMS server? Could this affect
>> the concurrency?
>>
>> /Bengt
>>
>>
>>
>>
>>
>> 2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>>
>> > I've now tried just using consumerCount on the endpoint without using
>> > threads(). However, it doesn't make it multithreaded. Only one thread is
>> > started for the route.
>> >
>> > Any other ideas? Has someone used sjms with a multithreaded consumer?
>> >
>> > /Bengt
>> >
>> >
>> > 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>> >
>> > Ok - thanks. I have tried it but only together with threads(). I didn't
>> >> realize that it might create threads on its own.
>> >>
>> >> /Bengt
>> >> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <cl...@gmail.com>:
>> >>
>> >> Hi
>> >>>
>> >>> I think you should use the consumerCount option on the endpoint
>> instead
>> >>>
>> >>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <be...@rodehav.com>
>> >>> wrote:
>> >>> > I'm using Camel 2.12.3 and the sjms component for communicating with
>> >>> > Weblogic JMS.
>> >>> >
>> >>> > Everything works fine when I use a single thread. However, to
>> increase
>> >>> > throughput I want multiple threads to read messages from the queue.
>> >>> >
>> >>> > I've done this by using the "threads()" method:
>> >>> >
>> >>> >   from(sjms:...).threads(10)....
>> >>> >
>> >>> > However I get an exception as follows:
>> >>> >
>> >>> > java.lang.Exception: Unable to complete sending the message: Only
>> one
>> >>> > thread may use a JMS Session at a time.:Only one thread may use a
>> JMS
>> >>> > Session at a time.
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
>> >>> > at
>> >>> >
>> >>>
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
>> >>> > at
>> >>> >
>> >>>
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
>> >>> > at
>> java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
>> >>> > at
>> >>> >
>> >>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
>> >>> > at
>> >>> >
>> >>>
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
>> >>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
>> >>> >
>> >>> > What exactly does this mean and how can I consume messages via sjms
>> >>> using
>> >>> > multiple threads?
>> >>> >
>> >>> > /Bengt
>> >>>
>> >>>
>> >>>
>> >>> --
>> >>> Claus Ibsen
>> >>> -----------------
>> >>> Red Hat, Inc.
>> >>> Email: cibsen@redhat.com
>> >>> Twitter: davsclaus
>> >>> Blog: http://davsclaus.com
>> >>> Author of Camel in Action: http://www.manning.com/ibsen
>> >>> Make your Camel applications look hawt, try: http://hawt.io
>> >>>
>> >>
>> >
>>
>
>
>
> --
> Charles Moulliard
> Apache Committer / Architect @RedHat
> Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io
>
>


-- 
Charles Moulliard
Apache Committer / Architect @RedHat
Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io

Re: sjms and multiple threads

Posted by Charles Moulliard <ch...@gmail.com>.
I will commit soon a unit test. That could help us to verify if something
goes wrong.


On Mon, Mar 17, 2014 at 9:38 AM, Bengt Rodehav <be...@rodehav.com> wrote:

> An update.
>
> I noticed in the stack trace that it seems to be the *production* of
> messages that get the exception "Unable to complete sending the message:
> Only one thread may use a JMS Session at a time." - not the *consumption*.
> I also only showed you the first part of the route (cause I didn't think
> the rest mattered). What I do is basically reading xml messages from one
> queue, transforming them and the sending them to another queue.
>
> Moreover, one incoming message may end up creating more than one (in this
> case 3) out messages. I'm using a splitter to accomplish this. In pseudo
> code the route looks like this:
>
>   from(sjms:...).threads(10).process(converting...).split().to(sjms:...);
>
> If I set consumerCount=10 on the consumer endpoint AND use threads(10) then
> I can see that more threads are created but I get the "Only one thread may
> use ..." exception. Now, if I also set the producerCount=10 on the producer
> endpoint then this exception goes away and my route works.
>
> Not exactly sure why this works. If anyone could explain the relationship
> between consumer/producer count and threads I would appreciate it. A theory
> of mine is that there must be at least as many consumers/producers as there
> are threads or there is a risk that two threads will try to use the same
> consumer/producer. If there is a one-to-one relationship between
> consumer/producer and JMS session then this could explain the exception.
> But this still sounds weird. If there is a pool of consumers/producers then
> the thread should try to acquire one from the pool. If no consumer/producer
> is available then the thread should wait until one is available - not try
> to use one that is in use by another thread.
>
> Also, although I can get my route to work this way, I get no better
> throughput than if I only use one thread. I get the exact same throughput
> and my CPU is basically idling. Obviously I'm not doing this right.
>
> Do I also need multiple connections to the JMS server? Could this affect
> the concurrency?
>
> /Bengt
>
>
>
>
>
> 2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>
> > I've now tried just using consumerCount on the endpoint without using
> > threads(). However, it doesn't make it multithreaded. Only one thread is
> > started for the route.
> >
> > Any other ideas? Has someone used sjms with a multithreaded consumer?
> >
> > /Bengt
> >
> >
> > 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
> >
> > Ok - thanks. I have tried it but only together with threads(). I didn't
> >> realize that it might create threads on its own.
> >>
> >> /Bengt
> >> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <cl...@gmail.com>:
> >>
> >> Hi
> >>>
> >>> I think you should use the consumerCount option on the endpoint instead
> >>>
> >>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <be...@rodehav.com>
> >>> wrote:
> >>> > I'm using Camel 2.12.3 and the sjms component for communicating with
> >>> > Weblogic JMS.
> >>> >
> >>> > Everything works fine when I use a single thread. However, to
> increase
> >>> > throughput I want multiple threads to read messages from the queue.
> >>> >
> >>> > I've done this by using the "threads()" method:
> >>> >
> >>> >   from(sjms:...).threads(10)....
> >>> >
> >>> > However I get an exception as follows:
> >>> >
> >>> > java.lang.Exception: Unable to complete sending the message: Only one
> >>> > thread may use a JMS Session at a time.:Only one thread may use a JMS
> >>> > Session at a time.
> >>> > at
> >>> >
> >>>
> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
> >>> > at
> >>> >
> >>>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
> >>> > at
> >>> >
> >>>
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
> >>> > at
> java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
> >>> > at
> >>> >
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
> >>> > at
> >>> >
> >>>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
> >>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
> >>> >
> >>> > What exactly does this mean and how can I consume messages via sjms
> >>> using
> >>> > multiple threads?
> >>> >
> >>> > /Bengt
> >>>
> >>>
> >>>
> >>> --
> >>> Claus Ibsen
> >>> -----------------
> >>> Red Hat, Inc.
> >>> Email: cibsen@redhat.com
> >>> Twitter: davsclaus
> >>> Blog: http://davsclaus.com
> >>> Author of Camel in Action: http://www.manning.com/ibsen
> >>> Make your Camel applications look hawt, try: http://hawt.io
> >>>
> >>
> >
>



-- 
Charles Moulliard
Apache Committer / Architect @RedHat
Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io

Re: sjms and multiple threads

Posted by Bengt Rodehav <be...@rodehav.com>.
An update.

I noticed in the stack trace that it seems to be the *production* of
messages that get the exception "Unable to complete sending the message:
Only one thread may use a JMS Session at a time." - not the *consumption*.
I also only showed you the first part of the route (cause I didn't think
the rest mattered). What I do is basically reading xml messages from one
queue, transforming them and the sending them to another queue.

Moreover, one incoming message may end up creating more than one (in this
case 3) out messages. I'm using a splitter to accomplish this. In pseudo
code the route looks like this:

  from(sjms:...).threads(10).process(converting...).split().to(sjms:...);

If I set consumerCount=10 on the consumer endpoint AND use threads(10) then
I can see that more threads are created but I get the "Only one thread may
use ..." exception. Now, if I also set the producerCount=10 on the producer
endpoint then this exception goes away and my route works.

Not exactly sure why this works. If anyone could explain the relationship
between consumer/producer count and threads I would appreciate it. A theory
of mine is that there must be at least as many consumers/producers as there
are threads or there is a risk that two threads will try to use the same
consumer/producer. If there is a one-to-one relationship between
consumer/producer and JMS session then this could explain the exception.
But this still sounds weird. If there is a pool of consumers/producers then
the thread should try to acquire one from the pool. If no consumer/producer
is available then the thread should wait until one is available - not try
to use one that is in use by another thread.

Also, although I can get my route to work this way, I get no better
throughput than if I only use one thread. I get the exact same throughput
and my CPU is basically idling. Obviously I'm not doing this right.

Do I also need multiple connections to the JMS server? Could this affect
the concurrency?

/Bengt





2014-03-17 8:39 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:

> I've now tried just using consumerCount on the endpoint without using
> threads(). However, it doesn't make it multithreaded. Only one thread is
> started for the route.
>
> Any other ideas? Has someone used sjms with a multithreaded consumer?
>
> /Bengt
>
>
> 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>
> Ok - thanks. I have tried it but only together with threads(). I didn't
>> realize that it might create threads on its own.
>>
>> /Bengt
>> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <cl...@gmail.com>:
>>
>> Hi
>>>
>>> I think you should use the consumerCount option on the endpoint instead
>>>
>>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <be...@rodehav.com>
>>> wrote:
>>> > I'm using Camel 2.12.3 and the sjms component for communicating with
>>> > Weblogic JMS.
>>> >
>>> > Everything works fine when I use a single thread. However, to increase
>>> > throughput I want multiple threads to read messages from the queue.
>>> >
>>> > I've done this by using the "threads()" method:
>>> >
>>> >   from(sjms:...).threads(10)....
>>> >
>>> > However I get an exception as follows:
>>> >
>>> > java.lang.Exception: Unable to complete sending the message: Only one
>>> > thread may use a JMS Session at a time.:Only one thread may use a JMS
>>> > Session at a time.
>>> > at
>>> >
>>> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
>>> > at
>>> >
>>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
>>> > at
>>> >
>>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
>>> > at java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
>>> > at
>>> >
>>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
>>> > at
>>> >
>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
>>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
>>> >
>>> > What exactly does this mean and how can I consume messages via sjms
>>> using
>>> > multiple threads?
>>> >
>>> > /Bengt
>>>
>>>
>>>
>>> --
>>> Claus Ibsen
>>> -----------------
>>> Red Hat, Inc.
>>> Email: cibsen@redhat.com
>>> Twitter: davsclaus
>>> Blog: http://davsclaus.com
>>> Author of Camel in Action: http://www.manning.com/ibsen
>>> Make your Camel applications look hawt, try: http://hawt.io
>>>
>>
>

Re: sjms and multiple threads

Posted by Charles Moulliard <ch...@gmail.com>.
Hi Bengt,

The sjms does not include yet a unit test for concurrentConsumers. I
suspect that here the different consumers are not using a different thread
but the same created for the first JMS session.

Regards,



On Mon, Mar 17, 2014 at 8:39 AM, Bengt Rodehav <be...@rodehav.com> wrote:

> I've now tried just using consumerCount on the endpoint without using
> threads(). However, it doesn't make it multithreaded. Only one thread is
> started for the route.
>
> Any other ideas? Has someone used sjms with a multithreaded consumer?
>
> /Bengt
>
>
> 2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:
>
> > Ok - thanks. I have tried it but only together with threads(). I didn't
> > realize that it might create threads on its own.
> >
> > /Bengt
> > Den 14 mar 2014 17:26 skrev "Claus Ibsen" <cl...@gmail.com>:
> >
> > Hi
> >>
> >> I think you should use the consumerCount option on the endpoint instead
> >>
> >> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <be...@rodehav.com>
> wrote:
> >> > I'm using Camel 2.12.3 and the sjms component for communicating with
> >> > Weblogic JMS.
> >> >
> >> > Everything works fine when I use a single thread. However, to increase
> >> > throughput I want multiple threads to read messages from the queue.
> >> >
> >> > I've done this by using the "threads()" method:
> >> >
> >> >   from(sjms:...).threads(10)....
> >> >
> >> > However I get an exception as follows:
> >> >
> >> > java.lang.Exception: Unable to complete sending the message: Only one
> >> > thread may use a JMS Session at a time.:Only one thread may use a JMS
> >> > Session at a time.
> >> > at
> >> >
> >>
> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
> >> > at
> >> >
> >>
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
> >> > at
> >> >
> >>
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
> >> > at java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
> >> > at
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
> >> > at
> >> >
> >>
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
> >> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
> >> >
> >> > What exactly does this mean and how can I consume messages via sjms
> >> using
> >> > multiple threads?
> >> >
> >> > /Bengt
> >>
> >>
> >>
> >> --
> >> Claus Ibsen
> >> -----------------
> >> Red Hat, Inc.
> >> Email: cibsen@redhat.com
> >> Twitter: davsclaus
> >> Blog: http://davsclaus.com
> >> Author of Camel in Action: http://www.manning.com/ibsen
> >> Make your Camel applications look hawt, try: http://hawt.io
> >>
> >
>



-- 
Charles Moulliard
Apache Committer / Architect @RedHat
Twitter : @cmoulliard | Blog :  http://cmoulliard.github.io

Re: sjms and multiple threads

Posted by Bengt Rodehav <be...@rodehav.com>.
I've now tried just using consumerCount on the endpoint without using
threads(). However, it doesn't make it multithreaded. Only one thread is
started for the route.

Any other ideas? Has someone used sjms with a multithreaded consumer?

/Bengt


2014-03-14 17:42 GMT+01:00 Bengt Rodehav <be...@rodehav.com>:

> Ok - thanks. I have tried it but only together with threads(). I didn't
> realize that it might create threads on its own.
>
> /Bengt
> Den 14 mar 2014 17:26 skrev "Claus Ibsen" <cl...@gmail.com>:
>
> Hi
>>
>> I think you should use the consumerCount option on the endpoint instead
>>
>> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <be...@rodehav.com> wrote:
>> > I'm using Camel 2.12.3 and the sjms component for communicating with
>> > Weblogic JMS.
>> >
>> > Everything works fine when I use a single thread. However, to increase
>> > throughput I want multiple threads to read messages from the queue.
>> >
>> > I've done this by using the "threads()" method:
>> >
>> >   from(sjms:...).threads(10)....
>> >
>> > However I get an exception as follows:
>> >
>> > java.lang.Exception: Unable to complete sending the message: Only one
>> > thread may use a JMS Session at a time.:Only one thread may use a JMS
>> > Session at a time.
>> > at
>> >
>> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
>> > at
>> >
>> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
>> > at
>> >
>> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
>> > at
>> >
>> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
>> > at java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
>> > at
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
>> > at
>> >
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
>> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
>> >
>> > What exactly does this mean and how can I consume messages via sjms
>> using
>> > multiple threads?
>> >
>> > /Bengt
>>
>>
>>
>> --
>> Claus Ibsen
>> -----------------
>> Red Hat, Inc.
>> Email: cibsen@redhat.com
>> Twitter: davsclaus
>> Blog: http://davsclaus.com
>> Author of Camel in Action: http://www.manning.com/ibsen
>> Make your Camel applications look hawt, try: http://hawt.io
>>
>

Re: sjms and multiple threads

Posted by Bengt Rodehav <be...@rodehav.com>.
Ok - thanks. I have tried it but only together with threads(). I didn't
realize that it might create threads on its own.

/Bengt
Den 14 mar 2014 17:26 skrev "Claus Ibsen" <cl...@gmail.com>:

> Hi
>
> I think you should use the consumerCount option on the endpoint instead
>
> On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <be...@rodehav.com> wrote:
> > I'm using Camel 2.12.3 and the sjms component for communicating with
> > Weblogic JMS.
> >
> > Everything works fine when I use a single thread. However, to increase
> > throughput I want multiple threads to read messages from the queue.
> >
> > I've done this by using the "threads()" method:
> >
> >   from(sjms:...).threads(10)....
> >
> > However I get an exception as follows:
> >
> > java.lang.Exception: Unable to complete sending the message: Only one
> > thread may use a JMS Session at a time.:Only one thread may use a JMS
> > Session at a time.
> > at
> >
> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
> > at
> >
> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
> > at
> >
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
> > at
> >
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
> > at
> >
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
> > at java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
> > at
> >
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
> > at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
> >
> > What exactly does this mean and how can I consume messages via sjms using
> > multiple threads?
> >
> > /Bengt
>
>
>
> --
> Claus Ibsen
> -----------------
> Red Hat, Inc.
> Email: cibsen@redhat.com
> Twitter: davsclaus
> Blog: http://davsclaus.com
> Author of Camel in Action: http://www.manning.com/ibsen
> Make your Camel applications look hawt, try: http://hawt.io
>

Re: sjms and multiple threads

Posted by Claus Ibsen <cl...@gmail.com>.
Hi

I think you should use the consumerCount option on the endpoint instead

On Fri, Mar 14, 2014 at 5:10 PM, Bengt Rodehav <be...@rodehav.com> wrote:
> I'm using Camel 2.12.3 and the sjms component for communicating with
> Weblogic JMS.
>
> Everything works fine when I use a single thread. However, to increase
> throughput I want multiple threads to read messages from the queue.
>
> I've done this by using the "threads()" method:
>
>   from(sjms:...).threads(10)....
>
> However I get an exception as follows:
>
> java.lang.Exception: Unable to complete sending the message: Only one
> thread may use a JMS Session at a time.:Only one thread may use a JMS
> Session at a time.
> at
> org.apache.camel.component.sjms.producer.InOnlyProducer.sendMessage(InOnlyProducer.java:135)[129:org.apache.camel.camel-sjms:2.12.3]
> at
> org.apache.camel.component.sjms.SjmsProducer.process(SjmsProducer.java:180)[129:org.apache.camel.camel-sjms:2.12.3]
> at
> org.apache.camel.processor.SendProcessor.process(SendProcessor.java:110)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:574)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.MulticastProcessor.doProcessSequential(MulticastProcessor.java:507)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.MulticastProcessor.process(MulticastProcessor.java:216)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.Splitter.process(Splitter.java:98)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.ChoiceProcessor.process(ChoiceProcessor.java:111)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.management.InstrumentationProcessor.process(InstrumentationProcessor.java:72)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.RedeliveryErrorHandler.process(RedeliveryErrorHandler.java:398)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.CamelInternalProcessor.process(CamelInternalProcessor.java:191)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:80)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.Pipeline.process(Pipeline.java:118)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.Pipeline.access$100(Pipeline.java:43)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.Pipeline$1.done(Pipeline.java:136)[106:org.apache.camel.camel-core:2.12.3]
> at
> org.apache.camel.processor.ThreadsProcessor$ProcessCall.run(ThreadsProcessor.java:83)[106:org.apache.camel.camel-core:2.12.3]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:441)[:1.6.0_32]
> at
> java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)[:1.6.0_32]
> at java.util.concurrent.FutureTask.run(FutureTask.java:138)[:1.6.0_32]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)[:1.6.0_32]
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)[:1.6.0_32]
> at java.lang.Thread.run(Thread.java:662)[:1.6.0_32]
>
> What exactly does this mean and how can I consume messages via sjms using
> multiple threads?
>
> /Bengt



-- 
Claus Ibsen
-----------------
Red Hat, Inc.
Email: cibsen@redhat.com
Twitter: davsclaus
Blog: http://davsclaus.com
Author of Camel in Action: http://www.manning.com/ibsen
Make your Camel applications look hawt, try: http://hawt.io