You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Erik Aschenbrenner <ea...@icubic.de> on 2015/01/22 09:48:54 UTC

Qpid JMS: Message listener blocks other message listener on different session

Dear Qpid Users,

I have an application where I'm using different message listeners to process
data from different topics.

For example I have a message listener which processes reference data
received via a broadcast topic and another message listener which processes
market data like trades and orders received on another broadcast topic.

Both message listeners are created by a dedicated JMS session (one session
for each message listener).

Because the market data can only be processed if all the reference data was
processed initially, the message listener for the market data should wait
and block if the reference data was not received completely.

So here the problem: if the thread of the message listener for market data
is blocked, the receiving of the reference data is also blocked an no other
data is received on the reference data listener.

Why are there dependencies between message listeners on different session?
As far as I understand each session should have it´s on thread?

Regards,
Erik




--
View this message in context: http://qpid.2158936.n2.nabble.com/Qpid-JMS-Message-listener-blocks-other-message-listener-on-different-session-tp7618696.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: Qpid JMS: Message listener blocks other message listener on different session

Posted by Rob Godfrey <ro...@gmail.com>.
The work around suggested by Keith would work - the issue is that there is
a single lock for each connection, and the lock is held for the duration of
the time the thread is within onMessage().  Separate connections would use
separate locks and thus not face this issue.

In terms of a timeline for the new JMS client, I'll defer to Robbie and Tim
who are the ones working on that,

Cheers,
Rob

On 30 January 2015 at 11:45, Erik Aschenbrenner <ea...@icubic.de> wrote:

> Hi Rob,
>
> thanks for this information.
>
> Is there a plan when the proton based JMS AMQP 1.0 client will be
> available?
>
> Do you think, that the workaround suggested by Keith (using a dedicated
> connetion for each message listener) would work?
>
> Regards,
> Erik
>
>
>
>
>
> --
> View this message in context:
> http://qpid.2158936.n2.nabble.com/Qpid-JMS-Message-listener-blocks-other-message-listener-on-different-session-tp7618696p7619216.html
> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>

Re: Qpid JMS: Message listener blocks other message listener on different session

Posted by Erik Aschenbrenner <ea...@icubic.de>.
Hi Rob,

thanks for this information. 

Is there a plan when the proton based JMS AMQP 1.0 client will be available?

Do you think, that the workaround suggested by Keith (using a dedicated
connetion for each message listener) would work?

Regards,
Erik





--
View this message in context: http://qpid.2158936.n2.nabble.com/Qpid-JMS-Message-listener-blocks-other-message-listener-on-different-session-tp7618696p7619216.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: Qpid JMS: Message listener blocks other message listener on different session

Posted by Rob Godfrey <ro...@gmail.com>.
Hi Erik,

due to the nature of this JMS client I don't think it will be possible for
us to easily fix the locking issue.

The client is built upon an initial non-JMS API I put together for initial
proving out of the AMQP 1.0 protocol. I later wrote a wrapper around that
non-JMS API to provide JMS functionality. Unfortunately it's unlikely we
could change the locking model without a fairly major rewrite since the
locking assumptions are in that underlying non-JMS part.

As Keith has already mentioned, there is already work underway to write a
new JMS AMQP 1.0 client built on top of the Qpid Proton library which will
(when complete) replace the existing JMS AMQP 1.0 client.

In the meantime we'll continue to support the existing JMS AMQP 1.0 with
small enhancements and bug-fixes, however for larger changes like this I
think it's better to invest the time in the new Proton-based solution.

Apologies,
Rob

On 30 January 2015 at 08:33, Erik Aschenbrenner <ea...@icubic.de> wrote:

> Hi Keith,
>
> thanks for your answer. I changed my application code so that messages
> listeners are not blocking anymore and delegating the message processing as
> fast as possible to other threads.
>
> Maybe I raise a Jira ticket if I notice still problems with this issue.
>
> Regards,
> Erik
>
>
>
> --
> View this message in context:
> http://qpid.2158936.n2.nabble.com/Qpid-JMS-Message-listener-blocks-other-message-listener-on-different-session-tp7618696p7619207.html
> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>

Re: Qpid JMS: Message listener blocks other message listener on different session

Posted by Erik Aschenbrenner <ea...@icubic.de>.
Hi Keith,

thanks for your answer. I changed my application code so that messages
listeners are not blocking anymore and delegating the message processing as
fast as possible to other threads.

Maybe I raise a Jira ticket if I notice still problems with this issue.

Regards,
Erik



--
View this message in context: http://qpid.2158936.n2.nabble.com/Qpid-JMS-Message-listener-blocks-other-message-listener-on-different-session-tp7618696p7619207.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: Qpid JMS: Message listener blocks other message listener on different session

Posted by Keith W <ke...@gmail.com>.
Hi Eric

I think your understanding of JMS is correct.  Message listeners on
independent sessions should provide for concurrent message delivery [1].
Application code blocking one onMessage should not affect delivery to
another on a separate session.
As you have observed, this is not the case with the current AMQP 1.0 client
(qpid-amqp-1-0-client-jms-x.xx). An internal connection scoped lock is
shared  by the message delivery paths.   To workaround this issue, you
could use independent connections.  If you wish, raise a Jira and submit a
patch.   To the best of my knowledge, most attention in the AMQP1.0 JMS
client area is being directed towards the new JMS 2.0 client.

Hope this helps, Keith.

[1] JMS 1.1 Spec, 4.17 Concurrent Message Delivery

On 28 January 2015 at 10:57, Erik Aschenbrenner <ea...@icubic.de> wrote:

>
> Ok, here are the missing details:
>
> I'm using the Java Qpid JMS client for AMQP 1.0 (version 0.30).
>
> The broker I'm connecting to also uses AMQP 1.0 and is from Red Hat (as far
> as I know). Actually I don't have other informations about the broker than
> the AMQP version.
>
> To make the problem clearer here is a more simpler setup of the problem:
>
> I have 2 topic message listeners listen to different topics. If listener 1
> receives a message it calls Thread.sleep(Long.MAX_VALUE) in it's
> onMessage()
> method in my test case.
> From now on listener 2 never gets a message from the AMQP broker again.
>
> I attached the stack trace at the end.
>
> Thanks in advance,
> Erik
>
> PS: I changed my code now so that listener 1 is not not blocking anymore
> but
> nevertheless I would like to know what is the problem here because I
> thought
> that the
> message listeners should be independent.
>
>
>
> 2015-01-28 11:40:12
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode):
>
> "RMI TCP Connection(2)-192.168.3.33" daemon prio=6 tid=0x0000000011f6e000
> nid=0x21bc runnable [0x00000000148af000]
>    java.lang.Thread.State: RUNNABLE
>         at java.net.SocketInputStream.socketRead0(Native Method)
>         at java.net.SocketInputStream.read(Unknown Source)
>         at java.net.SocketInputStream.read(Unknown Source)
>         at java.io.BufferedInputStream.fill(Unknown Source)
>         at java.io.BufferedInputStream.read(Unknown Source)
>         - locked <0x00000007b03e6e68> (a java.io.BufferedInputStream)
>         at java.io.FilterInputStream.read(Unknown Source)
>         at sun.rmi.transport.tcp.TCPTransport.handleMessages(Unknown
> Source)
>         at
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(Unknown
> Source)
>         at
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(Unknown Source)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - <0x00000007afe904a0> (a
> java.util.concurrent.ThreadPoolExecutor$Worker)
>
> "JMX server connection timeout 18" daemon prio=6 tid=0x0000000011f73000
> nid=0x2128 in Object.wait() [0x00000000146ef000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x00000007b01435a8> (a [I)
>         at
> com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(Unknown
> Source)
>         - locked <0x00000007b01435a8> (a [I)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "RMI Scheduler(0)" daemon prio=6 tid=0x0000000012308000 nid=0x2290 waiting
> on condition [0x00000000144ff000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>         at sun.misc.Unsafe.park(Native Method)
>         - parking to wait for  <0x00000007afe49618> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>         at java.util.concurrent.locks.LockSupport.parkNanos(Unknown Source)
>         at
>
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
> Source)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(Unknown
> Source)
>         at
>
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(Unknown
> Source)
>         at java.util.concurrent.ThreadPoolExecutor.getTask(Unknown Source)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "RMI TCP Connection(1)-192.168.3.33" daemon prio=6 tid=0x000000001217d800
> nid=0x2138 runnable [0x00000000143be000]
>    java.lang.Thread.State: RUNNABLE
>         at java.net.SocketInputStream.socketRead0(Native Method)
>         at java.net.SocketInputStream.read(Unknown Source)
>         at java.net.SocketInputStream.read(Unknown Source)
>         at java.io.BufferedInputStream.fill(Unknown Source)
>         at java.io.BufferedInputStream.read(Unknown Source)
>         - locked <0x00000007b00ef710> (a java.io.BufferedInputStream)
>         at java.io.FilterInputStream.read(Unknown Source)
>         at sun.rmi.transport.tcp.TCPTransport.handleMessages(Unknown
> Source)
>         at
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(Unknown
> Source)
>         at
> sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(Unknown Source)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown
> Source)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown
> Source)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - <0x00000007afe8e0d0> (a
> java.util.concurrent.ThreadPoolExecutor$Worker)
>
> "RMI TCP Accept-0" daemon prio=6 tid=0x0000000013158800 nid=0x9c4 runnable
> [0x000000001421e000]
>    java.lang.Thread.State: RUNNABLE
>         at java.net.DualStackPlainSocketImpl.accept0(Native Method)
>         at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
>         at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
>         at java.net.PlainSocketImpl.accept(Unknown Source)
>         - locked <0x00000007afe4ee58> (a java.net.SocksSocketImpl)
>         at java.net.ServerSocket.implAccept(Unknown Source)
>         at java.net.ServerSocket.accept(Unknown Source)
>         at
> sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(Unknown
> Source)
>         at
> sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(Unknown
> Source)
>         at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(Unknown
> Source)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "Thread-3" prio=6 tid=0x0000000011f5b000 nid=0x21b8 waiting on condition
> [0x0000000012fbe000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>         at java.lang.Thread.sleep(Native Method)
>         at
>
> de.amqptest.connection.MasterDataRetrievalTest$1.onMessage(MasterDataRetrievalTest.java:306)
>         at
>
> org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:942)
>         - locked <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "Thread-2" prio=6 tid=0x0000000011f7d000 nid=0x1554 in Object.wait()
> [0x0000000012d9f000]
>    java.lang.Thread.State: WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at java.lang.Object.wait(Object.java:503)
>         at
>
> org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:901)
>         - locked <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "Thread-1" prio=6 tid=0x0000000011e9b000 nid=0x1804 in Object.wait()
> [0x0000000012baf000]
>    java.lang.Thread.State: WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at java.lang.Object.wait(Object.java:503)
>         at
>
> org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:901)
>         - locked <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "QpidConnectionInputThread-0" daemon prio=6 tid=0x000000001190f000
> nid=0x23d4 waiting for monitor entry [0x0000000011cef000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at
>
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint.closedForInput(ConnectionEndpoint.java:730)
>         - waiting to lock <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at
> org.apache.qpid.amqp_1_0.framing.FrameHandler.isDone(FrameHandler.java:328)
>         at
>
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler.isDone(ConnectionHandler.java:83)
>         at
>
> org.apache.qpid.amqp_1_0.client.TCPTransportProvier.doRead(TCPTransportProvier.java:215)
>         at
>
> org.apache.qpid.amqp_1_0.client.TCPTransportProvier.access$000(TCPTransportProvier.java:43)
>         at
>
> org.apache.qpid.amqp_1_0.client.TCPTransportProvier$1.run(TCPTransportProvier.java:158)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "QpidConnectionOutputThread-0" daemon prio=6 tid=0x00000000116b2000
> nid=0x2364 waiting for monitor entry [0x00000000128cf000]
>    java.lang.Thread.State: BLOCKED (on object monitor)
>         at
>
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput.getNextFrame(ConnectionHandler.java:207)
>         - waiting to lock <0x0000000700a0e518> (a
> org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
>         at
>
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput$2.getNextFrame(ConnectionHandler.java:125)
>         at
>
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameToBytesSourceAdapter.getBytes(ConnectionHandler.java:317)
>         at
>
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler$SequentialBytesSource.getBytes(ConnectionHandler.java:406)
>         - locked <0x0000000700a1a570> (a
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler$SequentialBytesSource)
>         at
>
> org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler.run(ConnectionHandler.java:508)
>         at java.lang.Thread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "Service Thread" daemon prio=6 tid=0x000000000f94e800 nid=0x1c8c runnable
> [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
>    Locked ownable synchronizers:
>         - None
>
> "C2 CompilerThread1" daemon prio=10 tid=0x000000000f94e000 nid=0x4dc
> waiting
> on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
>    Locked ownable synchronizers:
>         - None
>
> "C2 CompilerThread0" daemon prio=10 tid=0x000000000f949000 nid=0x1080
> waiting on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
>    Locked ownable synchronizers:
>         - None
>
> "Attach Listener" daemon prio=10 tid=0x000000000f947000 nid=0x1e00 waiting
> on condition [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
>    Locked ownable synchronizers:
>         - None
>
> "Signal Dispatcher" daemon prio=10 tid=0x000000000f940000 nid=0xff4
> runnable
> [0x0000000000000000]
>    java.lang.Thread.State: RUNNABLE
>
>    Locked ownable synchronizers:
>         - None
>
> "Finalizer" daemon prio=8 tid=0x000000000f8e7000 nid=0x1a88 in
> Object.wait()
> [0x0000000010ddf000]
>    java.lang.Thread.State: WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x0000000700a11190> (a
> java.lang.ref.ReferenceQueue$Lock)
>         at java.lang.ref.ReferenceQueue.remove(Unknown Source)
>         - locked <0x0000000700a11190> (a java.lang.ref.ReferenceQueue$Lock)
>         at java.lang.ref.ReferenceQueue.remove(Unknown Source)
>         at java.lang.ref.Finalizer$FinalizerThread.run(Unknown Source)
>
>    Locked ownable synchronizers:
>         - None
>
> "Reference Handler" daemon prio=10 tid=0x000000000f8e0000 nid=0x170 in
> Object.wait() [0x0000000010b1f000]
>    java.lang.Thread.State: WAITING (on object monitor)
>         at java.lang.Object.wait(Native Method)
>         - waiting on <0x0000000700a10ba8> (a java.lang.ref.Reference$Lock)
>         at java.lang.Object.wait(Object.java:503)
>         at java.lang.ref.Reference$ReferenceHandler.run(Unknown Source)
>         - locked <0x0000000700a10ba8> (a java.lang.ref.Reference$Lock)
>
>    Locked ownable synchronizers:
>         - None
>
> "main" prio=6 tid=0x00000000024be800 nid=0x1354 waiting on condition
> [0x00000000027af000]
>    java.lang.Thread.State: TIMED_WAITING (sleeping)
>         at java.lang.Thread.sleep(Native Method)
>         at
>
> de.amqptest.connection.MasterDataRetrievalTest.doWait(MasterDataRetrievalTest.java:179)
>         at
>
> de.amqptest.connection.MasterDataRetrievalTest.run(MasterDataRetrievalTest.java:141)
>         at
>
> de.amqptest.connection.MasterDataRetrievalTest.main(MasterDataRetrievalTest.java:85)
>
>    Locked ownable synchronizers:
>         - None
>
> "VM Thread" prio=10 tid=0x000000000f8d8800 nid=0xd70 runnable
>
> "GC task thread#0 (ParallelGC)" prio=6 tid=0x000000000250d000 nid=0x10f4
> runnable
>
> "GC task thread#1 (ParallelGC)" prio=6 tid=0x000000000250f000 nid=0x2310
> runnable
>
> "GC task thread#2 (ParallelGC)" prio=6 tid=0x0000000002510800 nid=0x1ba4
> runnable
>
> "GC task thread#3 (ParallelGC)" prio=6 tid=0x0000000002512000 nid=0x19a4
> runnable
>
> "VM Periodic Task Thread" prio=10 tid=0x000000000f959000 nid=0x114c waiting
> on condition
>
> JNI global references: 169
>
>
>
>
>
>
>
> --
> View this message in context:
> http://qpid.2158936.n2.nabble.com/Qpid-JMS-Message-listener-blocks-other-message-listener-on-different-session-tp7618696p7618985.html
> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>

Re: Qpid JMS: Message listener blocks other message listener on different session

Posted by Erik Aschenbrenner <ea...@icubic.de>.
Ok, here are the missing details:

I'm using the Java Qpid JMS client for AMQP 1.0 (version 0.30).

The broker I'm connecting to also uses AMQP 1.0 and is from Red Hat (as far
as I know). Actually I don't have other informations about the broker than
the AMQP version.

To make the problem clearer here is a more simpler setup of the problem:

I have 2 topic message listeners listen to different topics. If listener 1
receives a message it calls Thread.sleep(Long.MAX_VALUE) in it's onMessage()
method in my test case.
>From now on listener 2 never gets a message from the AMQP broker again.

I attached the stack trace at the end. 

Thanks in advance,
Erik

PS: I changed my code now so that listener 1 is not not blocking anymore but
nevertheless I would like to know what is the problem here because I thought
that the 
message listeners should be independent.



2015-01-28 11:40:12
Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.65-b04 mixed mode):

"RMI TCP Connection(2)-192.168.3.33" daemon prio=6 tid=0x0000000011f6e000
nid=0x21bc runnable [0x00000000148af000]
   java.lang.Thread.State: RUNNABLE
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.read(Unknown Source)
	at java.net.SocketInputStream.read(Unknown Source)
	at java.io.BufferedInputStream.fill(Unknown Source)
	at java.io.BufferedInputStream.read(Unknown Source)
	- locked <0x00000007b03e6e68> (a java.io.BufferedInputStream)
	at java.io.FilterInputStream.read(Unknown Source)
	at sun.rmi.transport.tcp.TCPTransport.handleMessages(Unknown Source)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(Unknown
Source)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

   Locked ownable synchronizers:
	- <0x00000007afe904a0> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"JMX server connection timeout 18" daemon prio=6 tid=0x0000000011f73000
nid=0x2128 in Object.wait() [0x00000000146ef000]
   java.lang.Thread.State: TIMED_WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x00000007b01435a8> (a [I)
	at com.sun.jmx.remote.internal.ServerCommunicatorAdmin$Timeout.run(Unknown
Source)
	- locked <0x00000007b01435a8> (a [I)
	at java.lang.Thread.run(Unknown Source)

   Locked ownable synchronizers:
	- None

"RMI Scheduler(0)" daemon prio=6 tid=0x0000000012308000 nid=0x2290 waiting
on condition [0x00000000144ff000]
   java.lang.Thread.State: TIMED_WAITING (parking)
	at sun.misc.Unsafe.park(Native Method)
	- parking to wait for  <0x00000007afe49618> (a
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
	at java.util.concurrent.locks.LockSupport.parkNanos(Unknown Source)
	at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(Unknown
Source)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(Unknown
Source)
	at
java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(Unknown
Source)
	at java.util.concurrent.ThreadPoolExecutor.getTask(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

   Locked ownable synchronizers:
	- None

"RMI TCP Connection(1)-192.168.3.33" daemon prio=6 tid=0x000000001217d800
nid=0x2138 runnable [0x00000000143be000]
   java.lang.Thread.State: RUNNABLE
	at java.net.SocketInputStream.socketRead0(Native Method)
	at java.net.SocketInputStream.read(Unknown Source)
	at java.net.SocketInputStream.read(Unknown Source)
	at java.io.BufferedInputStream.fill(Unknown Source)
	at java.io.BufferedInputStream.read(Unknown Source)
	- locked <0x00000007b00ef710> (a java.io.BufferedInputStream)
	at java.io.FilterInputStream.read(Unknown Source)
	at sun.rmi.transport.tcp.TCPTransport.handleMessages(Unknown Source)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run0(Unknown
Source)
	at sun.rmi.transport.tcp.TCPTransport$ConnectionHandler.run(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

   Locked ownable synchronizers:
	- <0x00000007afe8e0d0> (a java.util.concurrent.ThreadPoolExecutor$Worker)

"RMI TCP Accept-0" daemon prio=6 tid=0x0000000013158800 nid=0x9c4 runnable
[0x000000001421e000]
   java.lang.Thread.State: RUNNABLE
	at java.net.DualStackPlainSocketImpl.accept0(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(Unknown Source)
	at java.net.AbstractPlainSocketImpl.accept(Unknown Source)
	at java.net.PlainSocketImpl.accept(Unknown Source)
	- locked <0x00000007afe4ee58> (a java.net.SocksSocketImpl)
	at java.net.ServerSocket.implAccept(Unknown Source)
	at java.net.ServerSocket.accept(Unknown Source)
	at sun.management.jmxremote.LocalRMIServerSocketFactory$1.accept(Unknown
Source)
	at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.executeAcceptLoop(Unknown
Source)
	at sun.rmi.transport.tcp.TCPTransport$AcceptLoop.run(Unknown Source)
	at java.lang.Thread.run(Unknown Source)

   Locked ownable synchronizers:
	- None

"Thread-3" prio=6 tid=0x0000000011f5b000 nid=0x21b8 waiting on condition
[0x0000000012fbe000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at
de.amqptest.connection.MasterDataRetrievalTest$1.onMessage(MasterDataRetrievalTest.java:306)
	at
org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:942)
	- locked <0x0000000700a0e518> (a
org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
	at java.lang.Thread.run(Unknown Source)

   Locked ownable synchronizers:
	- None

"Thread-2" prio=6 tid=0x0000000011f7d000 nid=0x1554 in Object.wait()
[0x0000000012d9f000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x0000000700a0e518> (a
org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
	at java.lang.Object.wait(Object.java:503)
	at
org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:901)
	- locked <0x0000000700a0e518> (a
org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
	at java.lang.Thread.run(Unknown Source)

   Locked ownable synchronizers:
	- None

"Thread-1" prio=6 tid=0x0000000011e9b000 nid=0x1804 in Object.wait()
[0x0000000012baf000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x0000000700a0e518> (a
org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
	at java.lang.Object.wait(Object.java:503)
	at
org.apache.qpid.amqp_1_0.jms.impl.SessionImpl$Dispatcher.run(SessionImpl.java:901)
	- locked <0x0000000700a0e518> (a
org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
	at java.lang.Thread.run(Unknown Source)

   Locked ownable synchronizers:
	- None

"QpidConnectionInputThread-0" daemon prio=6 tid=0x000000001190f000
nid=0x23d4 waiting for monitor entry [0x0000000011cef000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at
org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint.closedForInput(ConnectionEndpoint.java:730)
	- waiting to lock <0x0000000700a0e518> (a
org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
	at
org.apache.qpid.amqp_1_0.framing.FrameHandler.isDone(FrameHandler.java:328)
	at
org.apache.qpid.amqp_1_0.framing.ConnectionHandler.isDone(ConnectionHandler.java:83)
	at
org.apache.qpid.amqp_1_0.client.TCPTransportProvier.doRead(TCPTransportProvier.java:215)
	at
org.apache.qpid.amqp_1_0.client.TCPTransportProvier.access$000(TCPTransportProvier.java:43)
	at
org.apache.qpid.amqp_1_0.client.TCPTransportProvier$1.run(TCPTransportProvier.java:158)
	at java.lang.Thread.run(Unknown Source)

   Locked ownable synchronizers:
	- None

"QpidConnectionOutputThread-0" daemon prio=6 tid=0x00000000116b2000
nid=0x2364 waiting for monitor entry [0x00000000128cf000]
   java.lang.Thread.State: BLOCKED (on object monitor)
	at
org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput.getNextFrame(ConnectionHandler.java:207)
	- waiting to lock <0x0000000700a0e518> (a
org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint)
	at
org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameOutput$2.getNextFrame(ConnectionHandler.java:125)
	at
org.apache.qpid.amqp_1_0.framing.ConnectionHandler$FrameToBytesSourceAdapter.getBytes(ConnectionHandler.java:317)
	at
org.apache.qpid.amqp_1_0.framing.ConnectionHandler$SequentialBytesSource.getBytes(ConnectionHandler.java:406)
	- locked <0x0000000700a1a570> (a
org.apache.qpid.amqp_1_0.framing.ConnectionHandler$SequentialBytesSource)
	at
org.apache.qpid.amqp_1_0.framing.ConnectionHandler$BytesOutputHandler.run(ConnectionHandler.java:508)
	at java.lang.Thread.run(Unknown Source)

   Locked ownable synchronizers:
	- None

"Service Thread" daemon prio=6 tid=0x000000000f94e800 nid=0x1c8c runnable
[0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
	- None

"C2 CompilerThread1" daemon prio=10 tid=0x000000000f94e000 nid=0x4dc waiting
on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
	- None

"C2 CompilerThread0" daemon prio=10 tid=0x000000000f949000 nid=0x1080
waiting on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
	- None

"Attach Listener" daemon prio=10 tid=0x000000000f947000 nid=0x1e00 waiting
on condition [0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
	- None

"Signal Dispatcher" daemon prio=10 tid=0x000000000f940000 nid=0xff4 runnable
[0x0000000000000000]
   java.lang.Thread.State: RUNNABLE

   Locked ownable synchronizers:
	- None

"Finalizer" daemon prio=8 tid=0x000000000f8e7000 nid=0x1a88 in Object.wait()
[0x0000000010ddf000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x0000000700a11190> (a java.lang.ref.ReferenceQueue$Lock)
	at java.lang.ref.ReferenceQueue.remove(Unknown Source)
	- locked <0x0000000700a11190> (a java.lang.ref.ReferenceQueue$Lock)
	at java.lang.ref.ReferenceQueue.remove(Unknown Source)
	at java.lang.ref.Finalizer$FinalizerThread.run(Unknown Source)

   Locked ownable synchronizers:
	- None

"Reference Handler" daemon prio=10 tid=0x000000000f8e0000 nid=0x170 in
Object.wait() [0x0000000010b1f000]
   java.lang.Thread.State: WAITING (on object monitor)
	at java.lang.Object.wait(Native Method)
	- waiting on <0x0000000700a10ba8> (a java.lang.ref.Reference$Lock)
	at java.lang.Object.wait(Object.java:503)
	at java.lang.ref.Reference$ReferenceHandler.run(Unknown Source)
	- locked <0x0000000700a10ba8> (a java.lang.ref.Reference$Lock)

   Locked ownable synchronizers:
	- None

"main" prio=6 tid=0x00000000024be800 nid=0x1354 waiting on condition
[0x00000000027af000]
   java.lang.Thread.State: TIMED_WAITING (sleeping)
	at java.lang.Thread.sleep(Native Method)
	at
de.amqptest.connection.MasterDataRetrievalTest.doWait(MasterDataRetrievalTest.java:179)
	at
de.amqptest.connection.MasterDataRetrievalTest.run(MasterDataRetrievalTest.java:141)
	at
de.amqptest.connection.MasterDataRetrievalTest.main(MasterDataRetrievalTest.java:85)

   Locked ownable synchronizers:
	- None

"VM Thread" prio=10 tid=0x000000000f8d8800 nid=0xd70 runnable 

"GC task thread#0 (ParallelGC)" prio=6 tid=0x000000000250d000 nid=0x10f4
runnable 

"GC task thread#1 (ParallelGC)" prio=6 tid=0x000000000250f000 nid=0x2310
runnable 

"GC task thread#2 (ParallelGC)" prio=6 tid=0x0000000002510800 nid=0x1ba4
runnable 

"GC task thread#3 (ParallelGC)" prio=6 tid=0x0000000002512000 nid=0x19a4
runnable 

"VM Periodic Task Thread" prio=10 tid=0x000000000f959000 nid=0x114c waiting
on condition 

JNI global references: 169







--
View this message in context: http://qpid.2158936.n2.nabble.com/Qpid-JMS-Message-listener-blocks-other-message-listener-on-different-session-tp7618696p7618985.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: Qpid JMS: Message listener blocks other message listener on different session

Posted by Keith W <ke...@gmail.com>.
Hi Erik

Can you tell us a little more about your software stack?

Which Broker/version are you using (Qpid Cpp, Qpid Java, something else
non-Qpid)?
There are two distinct Qpid JMS clients at the moment, the 0-8..0-10 client
(i.e. qpid-client-x.xx-bin.tar.gz) and the AMQP 1.0 client (i.e.
qpid-amqp-1-0-client-jms-x.xx-bin.tar.gz)? I guess from your previous list
interactions that it is the latter. Can you confirm?

Do you have a thread dump (jstack -l) when the problem occurs?

Kind regards, Keith



On 22 January 2015 at 08:48, Erik Aschenbrenner <ea...@icubic.de> wrote:
>
> Dear Qpid Users,
>
> I have an application where I'm using different message listeners to
process
> data from different topics.
>
> For example I have a message listener which processes reference data
> received via a broadcast topic and another message listener which
processes
> market data like trades and orders received on another broadcast topic.
>
> Both message listeners are created by a dedicated JMS session (one session
> for each message listener).
>
> Because the market data can only be processed if all the reference data
was
> processed initially, the message listener for the market data should wait
> and block if the reference data was not received completely.
>
> So here the problem: if the thread of the message listener for market data
> is blocked, the receiving of the reference data is also blocked an no
other
> data is received on the reference data listener.
>
> Why are there dependencies between message listeners on different session?
> As far as I understand each session should have it´s on thread?
>
> Regards,
> Erik
>
>
>
>
> --
> View this message in context:
http://qpid.2158936.n2.nabble.com/Qpid-JMS-Message-listener-blocks-other-message-listener-on-different-session-tp7618696.html
> Sent from the Apache Qpid users mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>