You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@camel.apache.org by enalposi <en...@yahoo.com> on 2011/07/06 22:00:18 UTC

Each concurrentConsumer on JMS Topic receives ALL msgs

Hi - 

The answer may be as simple as 'that's the way it is, but what we observe is
not ideal:
Basically a pool of 'listeners'
(jms:topic:super_topic?concurrentConsumers=10) all receive the same message,
whereas the ideal scenario would be only one thread handles each msg and to
process different messages concurrently in the thread pool.

This (obviously) works fine with queues since only one consumer can pick the
msg off the transpor,t but we need several hosts to receive the payload. Is
this expected behavior and we have to manage our own pool of async
processing resources behind a single Topic listener thread?

Thanks!

--
View this message in context: http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4558687.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Each concurrentConsumer on JMS Topic receives ALL msgs

Posted by Tim <ch...@gmail.com>.
Claus to the rescue ;)
While waiting for the fix Enalposi maybe you can try a slightly uglier
wiretap to do the processing or something similar.
This will keep the request/response cycle theoretically low.

On Thu, Jul 7, 2011 at 11:10 AM, Claus Ibsen <cl...@gmail.com> wrote:

> The JmsConsumer will wait for the Exchange to complete.
>
> There is a ticket to improve this by supporting the async routing
> engine on the consumer level for InOnly JMS consumers.
> https://issues.apache.org/jira/browse/CAMEL-3632
>
> On Thu, Jul 7, 2011 at 6:01 PM, enalposi <en...@yahoo.com> wrote:
> > Hi Tim -
> >
> > Using Camel 2.7.2.
> > I actually had the ExchangePattern before threads() originally - the
> above
> > is just one permutation of several I tried (just moved it back - same
> > result).
> > Yes, I guess it should block if the 'consumer'/publisher is expecting a
> > response. But it's using OutOnly ExchangePattern. Additionally I set
> > disableReplyTo=true which Claus pointed out in some other post to really
> > make this one-directional.
> >
> > Regarding ack - shouldn't in this configuration the ack be returned by
> the
> > jms consumer thread, before handing it over to the worker thread pool?
> From
> > what I read it defaults to AUTO_ACKNOWLEDGE. I opened up both Camel and
> > Spring with DEBUG log level but see nothing related. On the Camel API
> side I
> > am not sure how to get a handle on anything to acknowledge given the
> > transport agnostic level of abstraction if I wanted to try out
> CLIENT_ACK.
> >
> > Cheers.
> >
> > --
> > View this message in context:
> http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561538.html
> > Sent from the Camel - Users mailing list archive at Nabble.com.
> >
>
>
>
> --
> Claus Ibsen
> -----------------
> FuseSource
> Email: cibsen@fusesource.com
> Web: http://fusesource.com
> Twitter: davsclaus, fusenews
> Blog: http://davsclaus.blogspot.com/
> Author of Camel in Action: http://www.manning.com/ibsen/
>

Re: Each concurrentConsumer on JMS Topic receives ALL msgs

Posted by enalposi <en...@yahoo.com>.
Alright, I coded a simple solution basically using a bounded LinkedList as an
LRU cache for filtering already processed message IDs on the callback.

This is not the most efficient approach (overhead piping payload through
Spring/Camel, before reaching my filtering logic & potential for contention
in high-throughput scenarios) but it should do for us until we hopefully get
a proper fix in Camel 2.9.

Thanks.

--
View this message in context: http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561798.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Each concurrentConsumer on JMS Topic receives ALL msgs

Posted by enalposi <en...@yahoo.com>.
Thanks Claus - 

In the meantime, do you have an opinion how to work around it? I see two
options (a) go via concurrentConsumers and somehow filter duplicate messages
or (b) hand the payload off to a custom async worker thread pool. (a) seems
to be quicker to do but not so sure about the overhead of handling all the
dupe messages before it hits the filtering logic.

Thanks.

--
View this message in context: http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561645.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Each concurrentConsumer on JMS Topic receives ALL msgs

Posted by Claus Ibsen <cl...@gmail.com>.
The JmsConsumer will wait for the Exchange to complete.

There is a ticket to improve this by supporting the async routing
engine on the consumer level for InOnly JMS consumers.
https://issues.apache.org/jira/browse/CAMEL-3632

On Thu, Jul 7, 2011 at 6:01 PM, enalposi <en...@yahoo.com> wrote:
> Hi Tim -
>
> Using Camel 2.7.2.
> I actually had the ExchangePattern before threads() originally - the above
> is just one permutation of several I tried (just moved it back - same
> result).
> Yes, I guess it should block if the 'consumer'/publisher is expecting a
> response. But it's using OutOnly ExchangePattern. Additionally I set
> disableReplyTo=true which Claus pointed out in some other post to really
> make this one-directional.
>
> Regarding ack - shouldn't in this configuration the ack be returned by the
> jms consumer thread, before handing it over to the worker thread pool? From
> what I read it defaults to AUTO_ACKNOWLEDGE. I opened up both Camel and
> Spring with DEBUG log level but see nothing related. On the Camel API side I
> am not sure how to get a handle on anything to acknowledge given the
> transport agnostic level of abstraction if I wanted to try out CLIENT_ACK.
>
> Cheers.
>
> --
> View this message in context: http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561538.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>



-- 
Claus Ibsen
-----------------
FuseSource
Email: cibsen@fusesource.com
Web: http://fusesource.com
Twitter: davsclaus, fusenews
Blog: http://davsclaus.blogspot.com/
Author of Camel in Action: http://www.manning.com/ibsen/

Re: Each concurrentConsumer on JMS Topic receives ALL msgs

Posted by enalposi <en...@yahoo.com>.
Hi Tim - 

Using Camel 2.7.2.
I actually had the ExchangePattern before threads() originally - the above
is just one permutation of several I tried (just moved it back - same
result).
Yes, I guess it should block if the 'consumer'/publisher is expecting a
response. But it's using OutOnly ExchangePattern. Additionally I set
disableReplyTo=true which Claus pointed out in some other post to really
make this one-directional.

Regarding ack - shouldn't in this configuration the ack be returned by the
jms consumer thread, before handing it over to the worker thread pool? From
what I read it defaults to AUTO_ACKNOWLEDGE. I opened up both Camel and
Spring with DEBUG log level but see nothing related. On the Camel API side I
am not sure how to get a handle on anything to acknowledge given the
transport agnostic level of abstraction if I wanted to try out CLIENT_ACK.

Cheers.

--
View this message in context: http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561538.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Each concurrentConsumer on JMS Topic receives ALL msgs

Posted by Tim <ch...@gmail.com>.
Ok. The behavior you are seeing makes sense to me if the consumer is waiting
for a 'response'.
I haven't looked at the code but there could be several reasons for this in
my mind.
1. Can you check the ack mode that you have on the topic?
2. Can you move the setting of the exchange pattern up to before the
threads?

You can do this in 2 ways. simply move that call or:


jms:topic:super-topic?exchangePattern=InOnly


Also, what version of Camel is this?

-Tim



On Thu, Jul 7, 2011 at 9:34 AM, enalposi <en...@yahoo.com> wrote:

> Hey Tim.
>
> Sorry, I have everything parameterized and missed a few values. The
> ExchangePattern is InOnly (system exposes same behavior for OutOnly for
> what
> it's worth...). sleepTime is constant 1000 in this case - you see nicely in
> the log how threads have a 1sec gap between ''start' and 'end' log and
> different worker threads from the pool are sequentially picked up.
>
> I initially observed this behavior on queues until figuring out that
> threads(...) != concurrentConsumers...
> But even for queues I am suspicious... if I for example set
> concurrentConsumers=5 and threads(1) it will process sequentially. I would
> have assumed the concurrentConsumer threads would automatically
> multi-thread
> downstream but this is not the case. I have to set both concurrencConsumers
> and threads(...) to the same number to achieve concurrency.
>
> Based on these observations I either suspect a bug in the Camel worker
> thread pool code or ignorance on my behalf how to use it correctly :-)
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561235.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>

Re: Each concurrentConsumer on JMS Topic receives ALL msgs

Posted by enalposi <en...@yahoo.com>.
Hey Tim.

Sorry, I have everything parameterized and missed a few values. The
ExchangePattern is InOnly (system exposes same behavior for OutOnly for what
it's worth...). sleepTime is constant 1000 in this case - you see nicely in
the log how threads have a 1sec gap between ''start' and 'end' log and
different worker threads from the pool are sequentially picked up.

I initially observed this behavior on queues until figuring out that
threads(...) != concurrentConsumers... 
But even for queues I am suspicious... if I for example set
concurrentConsumers=5 and threads(1) it will process sequentially. I would
have assumed the concurrentConsumer threads would automatically multi-thread
downstream but this is not the case. I have to set both concurrencConsumers
and threads(...) to the same number to achieve concurrency.

Based on these observations I either suspect a bug in the Camel worker
thread pool code or ignorance on my behalf how to use it correctly :-)

--
View this message in context: http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561235.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Each concurrentConsumer on JMS Topic receives ALL msgs

Posted by Tim <ch...@gmail.com>.
What is your ExchangePattern in setExchangePattern(pattern)?
Also are you mixing messages with sleep and non sleep values?
Are the sleep values all the same?

On Thu, Jul 7, 2011 at 8:18 AM, enalposi <en...@yahoo.com> wrote:

> Right, if I set concurrentConsumers=1 and while building the route
> threads(10) I do end up with a thread pool, however, these threads are used
> serially.
>
> The configuration is as follows:
>
>        final String from =
> "jms:topic:esb_test_topic?concurrentConsumers=1";
>        final String to   = "bean:jmsTopicListener?method=process";
>        RouteBuilder builder = new RouteBuilder(camelCtx_) {
>            @Override
>            public void configure() throws Exception {
>                onException(InvalidDestinationException.class,
> UnknownReplyMessageException.class).process(
>                        new CamelExceptionHandlerImpl(from, to)).stop();
>                from(from).threads(5).setExchangePattern(pattern).to(to);
>            }
>        };
>
> The 'listener' bean simply prints a message on receipt, Thread.sleeps for
> 1sec and prints another msg with the following output: Threads are being
> picked out of the pool with 1sec gaps.
>
>    public void process(Object _msg) {
>        log.info(String.format("Received(|->) [%1$s/%2$s]: %3$s",
> df.format(new Date()), beanName_, _msg));
>        if (sleepTime_ > 0) {
>            if (_msg instanceof SampleDto) {
>                SampleDto dto = (SampleDto) _msg;
>                dto.setThreadName(Thread.currentThread().getName());
>                data_.add(dto);
>            }
>            try {
>                Thread.sleep(sleepTime_);
>            } catch (InterruptedException e) {
>                e.printStackTrace();
>            }
>        }
>        log.info(String.format("Received(<-|) [%1$s/%2$s]: %3$s",
> df.format(new Date()), beanName_, _msg));
>    }
>
> Something in the Camel libs seems to be blocking...
>
> 2011-07-07 09:03:08,335|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-17                        | Sent [20110707
> 09:03:08:335]: SampleDtoImpl[data_13;thread=Thread-17]
> 2011-07-07 09:03:08,335|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-6                         | Sent [20110707
> 09:03:08:335]: SampleDtoImpl[data_2;thread=Thread-6]
> 2011-07-07 09:03:08,335|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-16                        | Sent [20110707
> 09:03:08:335]: SampleDtoImpl[data_12;thread=Thread-16]
> 2011-07-07 09:03:08,351|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-5                         | Sent [20110707
> 09:03:08:351]: SampleDtoImpl[data_1;thread=Thread-5]
> 2011-07-07 09:03:08,351|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-8                         | Sent [20110707
> 09:03:08:351]: SampleDtoImpl[data_4;thread=Thread-8]
> 2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-14                        | Sent [20110707
> 09:03:08:367]: SampleDtoImpl[data_10;thread=Thread-14]
> 2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-13                        | Sent [20110707
> 09:03:08:367]: SampleDtoImpl[data_9;thread=Thread-13]
> 2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-18                        | Sent [20110707
> 09:03:08:367]: SampleDtoImpl[data_14;thread=Thread-18]
> 2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-4                         | Sent [20110707
> 09:03:08:367]: SampleDtoImpl[data_0;thread=Thread-4]
> 2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-10                        | Sent [20110707
> 09:03:08:367]: SampleDtoImpl[data_6;thread=Thread-10]
> 2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-15                        | Sent [20110707
> 09:03:08:367]: SampleDtoImpl[data_11;thread=Thread-15]
> 2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-9                         | Sent [20110707
> 09:03:08:367]: SampleDtoImpl[data_5;thread=Thread-9]
> 2011-07-07 09:03:08,382|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-11                        | Sent [20110707
> 09:03:08:382]: SampleDtoImpl[data_7;thread=Thread-11]
> 2011-07-07 09:03:08,382|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-12                        | Sent [20110707
> 09:03:08:382]: SampleDtoImpl[data_8;thread=Thread-12]
> 2011-07-07 09:03:08,382|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
> send                 | Thread-7                         | Sent [20110707
> 09:03:08:382]: SampleDtoImpl[data_3;thread=Thread-7]
> 2011-07-07 09:03:08,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #11 - Threads | Received(|->)
> [20110707 09:03:08:304/esbTestJmsTopicListener]:
> SampleDtoImpl[data_13;thread=Thread-17]
> 2011-07-07 09:03:09,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #11 - Threads | Received(<-|)
> [20110707 09:03:09:304/esbTestJmsTopicListener]:
> SampleDtoImpl[data_13;thread=Camel (camel-4) thread #11 - Threads]
> 2011-07-07 09:03:09,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #12 - Threads | Received(|->)
> [20110707 09:03:09:304/esbTestJmsTopicListener]:
> SampleDtoImpl[data_4;thread=Thread-8]
> 2011-07-07 09:03:10,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #12 - Threads | Received(<-|)
> [20110707 09:03:10:304/esbTestJmsTopicListener]:
> SampleDtoImpl[data_4;thread=Camel (camel-4) thread #12 - Threads]
> 2011-07-07 09:03:10,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #13 - Threads | Received(|->)
> [20110707 09:03:10:304/esbTestJmsTopicListener]:
> SampleDtoImpl[data_1;thread=Thread-5]
> 2011-07-07 09:03:11,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #13 - Threads | Received(<-|)
> [20110707 09:03:11:304/esbTestJmsTopicListener]:
> SampleDtoImpl[data_1;thread=Camel (camel-4) thread #13 - Threads]
> 2011-07-07 09:03:11,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #14 - Threads | Received(|->)
> [20110707 09:03:11:304/esbTestJmsTopicListener]:
> SampleDtoImpl[data_2;thread=Thread-6]
> 2011-07-07 09:03:12,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #14 - Threads | Received(<-|)
> [20110707 09:03:12:304/esbTestJmsTopicListener]:
> SampleDtoImpl[data_2;thread=Camel (camel-4) thread #14 - Threads]
> 2011-07-07 09:03:12,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #15 - Threads | Received(|->)
> [20110707 09:03:12:304/esbTestJmsTopicListener]:
> SampleDtoImpl[data_12;thread=Thread-16]
> 2011-07-07 09:03:13,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #15 - Threads | Received(<-|)
> [20110707 09:03:13:304/esbTestJmsTopicListener]:
> SampleDtoImpl[data_12;thread=Camel (camel-4) thread #15 - Threads]
> 2011-07-07 09:03:13,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #11 - Threads | Received(|->)
> [20110707 09:03:13:304/esbTestJmsTopicListener]:
> SampleDtoImpl[data_9;thread=Thread-13]
> 2011-07-07 09:03:14,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #11 - Threads | Received(<-|)
> [20110707 09:03:14:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_9;thread=Camel (camel-4) thread #11 - Threads]
> 2011-07-07 09:03:14,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #12 - Threads | Received(|->)
> [20110707 09:03:14:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_6;thread=Thread-10]
> 2011-07-07 09:03:15,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #12 - Threads | Received(<-|)
> [20110707 09:03:15:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_6;thread=Camel (camel-4) thread #12 - Threads]
> 2011-07-07 09:03:15,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #13 - Threads | Received(|->)
> [20110707 09:03:15:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_0;thread=Thread-4]
> 2011-07-07 09:03:16,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #13 - Threads | Received(<-|)
> [20110707 09:03:16:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_0;thread=Camel (camel-4) thread #13 - Threads]
> 2011-07-07 09:03:16,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #13 - Threads | Received(|->)
> [20110707 09:03:16:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_10;thread=Thread-14]
> 2011-07-07 09:03:17,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #13 - Threads | Received(<-|)
> [20110707 09:03:17:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_10;thread=Camel (camel-4) thread #13 - Threads]
> 2011-07-07 09:03:17,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #15 - Threads | Received(|->)
> [20110707 09:03:17:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_14;thread=Thread-18]
> 2011-07-07 09:03:18,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #15 - Threads | Received(<-|)
> [20110707 09:03:18:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_14;thread=Camel (camel-4) thread #15 - Threads]
> 2011-07-07 09:03:18,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #11 - Threads | Received(|->)
> [20110707 09:03:18:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_5;thread=Thread-9]
> 2011-07-07 09:03:19,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #11 - Threads | Received(<-|)
> [20110707 09:03:19:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_5;thread=Camel (camel-4) thread #11 - Threads]
> 2011-07-07 09:03:19,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #12 - Threads | Received(|->)
> [20110707 09:03:19:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_11;thread=Thread-15]
> 2011-07-07 09:03:20,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #12 - Threads | Received(<-|)
> [20110707 09:03:20:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_11;thread=Camel (camel-4) thread #12 - Threads]
> 2011-07-07 09:03:20,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #14 - Threads | Received(|->)
> [20110707 09:03:20:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_8;thread=Thread-12]
> 2011-07-07 09:03:21,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #14 - Threads | Received(<-|)
> [20110707 09:03:21:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_8;thread=Camel (camel-4) thread #14 - Threads]
> 2011-07-07 09:03:21,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #13 - Threads | Received(|->)
> [20110707 09:03:21:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_3;thread=Thread-7]
> 2011-07-07 09:03:22,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #13 - Threads | Received(<-|)
> [20110707 09:03:22:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_3;thread=Camel (camel-4) thread #13 - Threads]
> 2011-07-07 09:03:22,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
> process              | l (camel-4) thread #15 - Threads | Received(|->)
> [20110707 09:03:22:320/esbTestJmsTopicListener]:
> SampleDtoImpl[data_7;thread=Thread-11]
>
>
>
>
> --
> View this message in context:
> http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561025.html
> Sent from the Camel - Users mailing list archive at Nabble.com.
>

Re: Each concurrentConsumer on JMS Topic receives ALL msgs

Posted by enalposi <en...@yahoo.com>.
I sometimes use AMQ for isolated local testing but our production transport
is Tibco EMS.

But in any case, the Camel approach to multi-threading doesn't seem to be
working as expected, so I am not quite ready to make changes to the
underlying configuration. I could probably also set up a single listener
thread with a custom async worker pool on top.

--
View this message in context: http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561321.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Each concurrentConsumer on JMS Topic receives ALL msgs

Posted by boday <be...@initekconsulting.com>.
you can always use AMQ virtual topics instead, then you can do concurrent
consumption because they are queues...see the camel durable subscriber EIP
for an example...

_______________
Ben O'Day

On Jul 7, 2011, at 6:18 AM, "enalposi [via Camel]" <
ml-node+4561025-2099553868-45011@n5.nabble.com> wrote:

Right, if I set concurrentConsumers=1 and while building the route
threads(10) I do end up with a thread pool, however, these threads are used
serially.

The configuration is as follows:

        final String from =
"jms:topic:esb_test_topic?concurrentConsumers=1";
        final String to   = "bean:jmsTopicListener?method=process";
        RouteBuilder builder = new RouteBuilder(camelCtx_) {
            @Override
            public void configure() throws Exception {
                onException(InvalidDestinationException.class,
UnknownReplyMessageException.class).process(
                        new CamelExceptionHandlerImpl(from, to)).stop();
                from(from).threads(5).setExchangePattern(pattern).to(to);
            }
        };

The 'listener' bean simply prints a message on receipt, Thread.sleeps for
1sec and prints another msg with the following output: Threads are being
picked out of the pool with 1sec gaps.

    public void process(Object _msg) {
        log.info(String.format("Received(|->) [%1$s/%2$s]: %3$s",
df.format(new Date()), beanName_, _msg));
        if (sleepTime_ > 0) {
            if (_msg instanceof SampleDto) {
                SampleDto dto = (SampleDto) _msg;
                dto.setThreadName(Thread.currentThread().getName());
                data_.add(dto);
            }
            try {
                Thread.sleep(sleepTime_);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.info(String.format("Received(<-|) [%1$s/%2$s]: %3$s",
df.format(new Date()), beanName_, _msg));
    }

Something in the Camel libs seems to be blocking...

2011-07-07 09:03:08,335|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-17                        | Sent [20110707
09:03:08:335]: SampleDtoImpl[data_13;thread=Thread-17]
2011-07-07 09:03:08,335|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-6                         | Sent [20110707
09:03:08:335]: SampleDtoImpl[data_2;thread=Thread-6]
2011-07-07 09:03:08,335|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-16                        | Sent [20110707
09:03:08:335]: SampleDtoImpl[data_12;thread=Thread-16]
2011-07-07 09:03:08,351|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-5                         | Sent [20110707
09:03:08:351]: SampleDtoImpl[data_1;thread=Thread-5]
2011-07-07 09:03:08,351|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-8                         | Sent [20110707
09:03:08:351]: SampleDtoImpl[data_4;thread=Thread-8]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-14                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_10;thread=Thread-14]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-13                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_9;thread=Thread-13]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-18                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_14;thread=Thread-18]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-4                         | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_0;thread=Thread-4]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-10                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_6;thread=Thread-10]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-15                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_11;thread=Thread-15]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-9                         | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_5;thread=Thread-9]
2011-07-07 09:03:08,382|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-11                        | Sent [20110707
09:03:08:382]: SampleDtoImpl[data_7;thread=Thread-11]
2011-07-07 09:03:08,382|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-12                        | Sent [20110707
09:03:08:382]: SampleDtoImpl[data_8;thread=Thread-12]
2011-07-07 09:03:08,382|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-7                         | Sent [20110707
09:03:08:382]: SampleDtoImpl[data_3;thread=Thread-7]
2011-07-07 09:03:08,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(|->)
[20110707 09:03:08:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_13;thread=Thread-17]
2011-07-07 09:03:09,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(<-|)
[20110707 09:03:09:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_13;thread=Camel (camel-4) thread #11 - Threads]
2011-07-07 09:03:09,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(|->)
[20110707 09:03:09:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_4;thread=Thread-8]
2011-07-07 09:03:10,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(<-|)
[20110707 09:03:10:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_4;thread=Camel (camel-4) thread #12 - Threads]
2011-07-07 09:03:10,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(|->)
[20110707 09:03:10:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_1;thread=Thread-5]
2011-07-07 09:03:11,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(<-|)
[20110707 09:03:11:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_1;thread=Camel (camel-4) thread #13 - Threads]
2011-07-07 09:03:11,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #14 - Threads | Received(|->)
[20110707 09:03:11:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_2;thread=Thread-6]
2011-07-07 09:03:12,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #14 - Threads | Received(<-|)
[20110707 09:03:12:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_2;thread=Camel (camel-4) thread #14 - Threads]
2011-07-07 09:03:12,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(|->)
[20110707 09:03:12:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_12;thread=Thread-16]
2011-07-07 09:03:13,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(<-|)
[20110707 09:03:13:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_12;thread=Camel (camel-4) thread #15 - Threads]
2011-07-07 09:03:13,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(|->)
[20110707 09:03:13:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_9;thread=Thread-13]
2011-07-07 09:03:14,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(<-|)
[20110707 09:03:14:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_9;thread=Camel (camel-4) thread #11 - Threads]
2011-07-07 09:03:14,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(|->)
[20110707 09:03:14:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_6;thread=Thread-10]
2011-07-07 09:03:15,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(<-|)
[20110707 09:03:15:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_6;thread=Camel (camel-4) thread #12 - Threads]
2011-07-07 09:03:15,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(|->)
[20110707 09:03:15:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_0;thread=Thread-4]
2011-07-07 09:03:16,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(<-|)
[20110707 09:03:16:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_0;thread=Camel (camel-4) thread #13 - Threads]
2011-07-07 09:03:16,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(|->)
[20110707 09:03:16:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_10;thread=Thread-14]
2011-07-07 09:03:17,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(<-|)
[20110707 09:03:17:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_10;thread=Camel (camel-4) thread #13 - Threads]
2011-07-07 09:03:17,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(|->)
[20110707 09:03:17:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_14;thread=Thread-18]
2011-07-07 09:03:18,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(<-|)
[20110707 09:03:18:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_14;thread=Camel (camel-4) thread #15 - Threads]
2011-07-07 09:03:18,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(|->)
[20110707 09:03:18:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_5;thread=Thread-9]
2011-07-07 09:03:19,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(<-|)
[20110707 09:03:19:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_5;thread=Camel (camel-4) thread #11 - Threads]
2011-07-07 09:03:19,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(|->)
[20110707 09:03:19:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_11;thread=Thread-15]
2011-07-07 09:03:20,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(<-|)
[20110707 09:03:20:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_11;thread=Camel (camel-4) thread #12 - Threads]
2011-07-07 09:03:20,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #14 - Threads | Received(|->)
[20110707 09:03:20:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_8;thread=Thread-12]
2011-07-07 09:03:21,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #14 - Threads | Received(<-|)
[20110707 09:03:21:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_8;thread=Camel (camel-4) thread #14 - Threads]
2011-07-07 09:03:21,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(|->)
[20110707 09:03:21:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_3;thread=Thread-7]
2011-07-07 09:03:22,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(<-|)
[20110707 09:03:22:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_3;thread=Camel (camel-4) thread #13 - Threads]
2011-07-07 09:03:22,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(|->)
[20110707 09:03:22:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_7;thread=Thread-11]




------------------------------
 If you reply to this email, your message will be added to the discussion
below:
http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561025.html
 To unsubscribe from Each concurrentConsumer on JMS Topic receives ALL msgs,
click here<http://camel.465427.n5.nabble.com/template/NamlServlet.jtp?macro=unsubscribe_by_code&node=4558687&code=YmVuLm9kYXlAaW5pdGVrY29uc3VsdGluZy5jb218NDU1ODY4N3wxNDU1ODI4NTg5>.


-----
Ben O'Day
IT Consultant -http://consulting-notes.com

--
View this message in context: http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561241.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Each concurrentConsumer on JMS Topic receives ALL msgs

Posted by enalposi <en...@yahoo.com>.
Right, if I set concurrentConsumers=1 and while building the route
threads(10) I do end up with a thread pool, however, these threads are used
serially.

The configuration is as follows:

        final String from =
"jms:topic:esb_test_topic?concurrentConsumers=1";
        final String to   = "bean:jmsTopicListener?method=process";
        RouteBuilder builder = new RouteBuilder(camelCtx_) {
            @Override
            public void configure() throws Exception {
                onException(InvalidDestinationException.class,
UnknownReplyMessageException.class).process(
                        new CamelExceptionHandlerImpl(from, to)).stop();
                from(from).threads(5).setExchangePattern(pattern).to(to);
            }
        };

The 'listener' bean simply prints a message on receipt, Thread.sleeps for
1sec and prints another msg with the following output: Threads are being
picked out of the pool with 1sec gaps.

    public void process(Object _msg) {
        log.info(String.format("Received(|->) [%1$s/%2$s]: %3$s",
df.format(new Date()), beanName_, _msg));
        if (sleepTime_ > 0) {
            if (_msg instanceof SampleDto) {
                SampleDto dto = (SampleDto) _msg;
                dto.setThreadName(Thread.currentThread().getName());
                data_.add(dto);
            }
            try {
                Thread.sleep(sleepTime_);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        log.info(String.format("Received(<-|) [%1$s/%2$s]: %3$s",
df.format(new Date()), beanName_, _msg));
    }

Something in the Camel libs seems to be blocking...

2011-07-07 09:03:08,335|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-17                        | Sent [20110707
09:03:08:335]: SampleDtoImpl[data_13;thread=Thread-17]
2011-07-07 09:03:08,335|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-6                         | Sent [20110707
09:03:08:335]: SampleDtoImpl[data_2;thread=Thread-6]
2011-07-07 09:03:08,335|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-16                        | Sent [20110707
09:03:08:335]: SampleDtoImpl[data_12;thread=Thread-16]
2011-07-07 09:03:08,351|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-5                         | Sent [20110707
09:03:08:351]: SampleDtoImpl[data_1;thread=Thread-5]
2011-07-07 09:03:08,351|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-8                         | Sent [20110707
09:03:08:351]: SampleDtoImpl[data_4;thread=Thread-8]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-14                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_10;thread=Thread-14]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-13                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_9;thread=Thread-13]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-18                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_14;thread=Thread-18]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-4                         | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_0;thread=Thread-4]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-10                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_6;thread=Thread-10]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-15                        | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_11;thread=Thread-15]
2011-07-07 09:03:08,367|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-9                         | Sent [20110707
09:03:08:367]: SampleDtoImpl[data_5;thread=Thread-9]
2011-07-07 09:03:08,382|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-11                        | Sent [20110707
09:03:08:382]: SampleDtoImpl[data_7;thread=Thread-11]
2011-07-07 09:03:08,382|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-12                        | Sent [20110707
09:03:08:382]: SampleDtoImpl[data_8;thread=Thread-12]
2011-07-07 09:03:08,382|INFO | bus.bao.EsbSampleStandalonePublisherImpl |
send                 | Thread-7                         | Sent [20110707
09:03:08:382]: SampleDtoImpl[data_3;thread=Thread-7]
2011-07-07 09:03:08,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(|->)
[20110707 09:03:08:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_13;thread=Thread-17]
2011-07-07 09:03:09,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(<-|)
[20110707 09:03:09:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_13;thread=Camel (camel-4) thread #11 - Threads]
2011-07-07 09:03:09,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(|->)
[20110707 09:03:09:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_4;thread=Thread-8]
2011-07-07 09:03:10,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(<-|)
[20110707 09:03:10:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_4;thread=Camel (camel-4) thread #12 - Threads]
2011-07-07 09:03:10,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(|->)
[20110707 09:03:10:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_1;thread=Thread-5]
2011-07-07 09:03:11,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(<-|)
[20110707 09:03:11:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_1;thread=Camel (camel-4) thread #13 - Threads]
2011-07-07 09:03:11,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #14 - Threads | Received(|->)
[20110707 09:03:11:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_2;thread=Thread-6]
2011-07-07 09:03:12,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #14 - Threads | Received(<-|)
[20110707 09:03:12:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_2;thread=Camel (camel-4) thread #14 - Threads]
2011-07-07 09:03:12,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(|->)
[20110707 09:03:12:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_12;thread=Thread-16]
2011-07-07 09:03:13,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(<-|)
[20110707 09:03:13:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_12;thread=Camel (camel-4) thread #15 - Threads]
2011-07-07 09:03:13,304|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(|->)
[20110707 09:03:13:304/esbTestJmsTopicListener]:
SampleDtoImpl[data_9;thread=Thread-13]
2011-07-07 09:03:14,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(<-|)
[20110707 09:03:14:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_9;thread=Camel (camel-4) thread #11 - Threads]
2011-07-07 09:03:14,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(|->)
[20110707 09:03:14:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_6;thread=Thread-10]
2011-07-07 09:03:15,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(<-|)
[20110707 09:03:15:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_6;thread=Camel (camel-4) thread #12 - Threads]
2011-07-07 09:03:15,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(|->)
[20110707 09:03:15:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_0;thread=Thread-4]
2011-07-07 09:03:16,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(<-|)
[20110707 09:03:16:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_0;thread=Camel (camel-4) thread #13 - Threads]
2011-07-07 09:03:16,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(|->)
[20110707 09:03:16:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_10;thread=Thread-14]
2011-07-07 09:03:17,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(<-|)
[20110707 09:03:17:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_10;thread=Camel (camel-4) thread #13 - Threads]
2011-07-07 09:03:17,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(|->)
[20110707 09:03:17:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_14;thread=Thread-18]
2011-07-07 09:03:18,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(<-|)
[20110707 09:03:18:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_14;thread=Camel (camel-4) thread #15 - Threads]
2011-07-07 09:03:18,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(|->)
[20110707 09:03:18:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_5;thread=Thread-9]
2011-07-07 09:03:19,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #11 - Threads | Received(<-|)
[20110707 09:03:19:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_5;thread=Camel (camel-4) thread #11 - Threads]
2011-07-07 09:03:19,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(|->)
[20110707 09:03:19:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_11;thread=Thread-15]
2011-07-07 09:03:20,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #12 - Threads | Received(<-|)
[20110707 09:03:20:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_11;thread=Camel (camel-4) thread #12 - Threads]
2011-07-07 09:03:20,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #14 - Threads | Received(|->)
[20110707 09:03:20:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_8;thread=Thread-12]
2011-07-07 09:03:21,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #14 - Threads | Received(<-|)
[20110707 09:03:21:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_8;thread=Camel (camel-4) thread #14 - Threads]
2011-07-07 09:03:21,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(|->)
[20110707 09:03:21:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_3;thread=Thread-7]
2011-07-07 09:03:22,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #13 - Threads | Received(<-|)
[20110707 09:03:22:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_3;thread=Camel (camel-4) thread #13 - Threads]
2011-07-07 09:03:22,320|INFO | .bus.bao.EsbSampleStandaloneListenerImpl |
process              | l (camel-4) thread #15 - Threads | Received(|->)
[20110707 09:03:22:320/esbTestJmsTopicListener]:
SampleDtoImpl[data_7;thread=Thread-11]




--
View this message in context: http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4561025.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Each concurrentConsumer on JMS Topic receives ALL msgs

Posted by enalposi <en...@yahoo.com>.
Ok, makes sense. Cheers!

--
View this message in context: http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4558845.html
Sent from the Camel - Users mailing list archive at Nabble.com.

Re: Each concurrentConsumer on JMS Topic receives ALL msgs

Posted by boday <be...@initekconsulting.com>.
yep, you need to make the downstream route do configure multi-threading,
otherwise you will get duplicate messages

instead of this...

from("jms:topic:super_topic?concurrentConsumers=10)...

do this...

from("jms:topic:super_topic?concurrentConsumers=1&maxConcurrentConsumers=1).threads(10)...


enalposi wrote:
> 
> Hi - 
> 
> The answer may be as simple as 'that's the way it is, but what we observe
> is not ideal:
> Basically a pool of 'listeners'
> (jms:topic:super_topic?concurrentConsumers=10) all receive the same
> message, whereas the ideal scenario would be only one thread handles each
> msg and to process different messages concurrently in the thread pool.
> 
> This (obviously) works fine with queues since only one consumer can pick
> the msg off the transpor,t but we need several hosts to receive the
> payload. Is this expected behavior and we have to manage our own pool of
> async processing resources behind a single Topic listener thread?
> 
> Thanks!
> 


-----
Ben O'Day
IT Consultant -http://consulting-notes.com

--
View this message in context: http://camel.465427.n5.nabble.com/Each-concurrentConsumer-on-JMS-Topic-receives-ALL-msgs-tp4558687p4558803.html
Sent from the Camel - Users mailing list archive at Nabble.com.