You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@qpid.apache.org by Branden Smith <BS...@liaison.com> on 2013/10/11 23:21:56 UTC

AMQP-JMS client: messages dequeued as AmqpMessageImpl instances?

I'm using the Qpid AMQP-JMS client to both publish and subscribe to the same queue (provided by HornetQ 2.4.0.beta1); I'm publishing a message as a javax.jms.TextMessage, but it is being received by the listener as a org.apache.qpid.amqp_1_0.jms.impl.AmqpMessageImpl.

Publisher:

    queueSession = queueCnxn.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
    queueProducer = queueSession.createSender(eventQueue);
    queueProducer.send(queueSession.createTextMessage("test message"));

Consumer:

    public class TestConsumer implements MessageListener {
	@Override
	public void onMessage(Message message) {
		final String logPrefix;
		TextMessage txtMsg;
		
		logPrefix = "[onMessage] ";
		log.debug(">>> " + logPrefix);
		
		try {
			message.acknowledge();
			log.debug("message acknowledged");
			log.debug("message type: " + message.getClass());
			//log.debug("body assignable to String??? " + message.isBodyAssignableTo(String.class));
		} catch (JMSException jmsExc) {
			// TODO
			log.error("ack-exception!", jmsExc);
		}
      ...

Output:

    17:07:42.035 [Thread-4] DEBUG >>> [onMessage] 
    17:07:42.035 [Thread-4] DEBUG message acknowledged
    17:07:42.035 [Thread-4] DEBUG message type: class org.apache.qpid.amqp_1_0.jms.impl.AmqpMessageImpl

I'm not sure what to do with AmqpMessageImpl; it only inherits from MessageImpl, and only implements javax.jms.Message. Furthermore, if I attempt to call Message.isBodyAssignableTo(Class<T>), I get an AbstractMethodError. Trying to directly cast to TextMessage throws a ClassCastException.

The examples all show the consumer receiving the usual types defined by the JMS API; any thoughts as to what I could be doing wrong?

Thanks,

Branden Smith
bsmith@liaison.com


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


RE: AMQP-JMS client: messages dequeued as AmqpMessageImpl instances?

Posted by Branden Smith <BS...@liaison.com>.
Hi Rob,

Sorry for the delayed response. The acknowledgment problem was a bug in my code at the JMS level; I was requesting a transactional session, but failing to create a JTA transaction to accommodate it. I resolved the problem by switching to a non-transactional session, which should be sufficient for my use case.

Unfortunately, the message format/deserialization problem has returned; I have been unable to identify what could have happened which temporarily seemed to have resolved it.

When I send a TextMessage, my consumer receives it as an AmqpMessageImpl; meanwhile, when I send an ObjectMessage, the Qpid library does correctly identify it as an ObjectMessage, but I get this Exception when I attempt to retrieve the payload:

    09:46:37.689 [Thread-4] ERROR [TestConsumerListener] Exception in onMessage: javax.jms.JMSException: invalid stream header: 00000000
    javax.jms.JMSException: invalid stream header: 00000000
        at org.apache.qpid.amqp_1_0.jms.impl.ObjectMessageImpl.getObject(ObjectMessageImpl.java:124) ~[qpid-amqp-1-0-client-jms-0.24.jar:?]

I've posted a question to the HornetQ forum here (but unfortunately have not received any responses as yet):

https://community.jboss.org/thread/233613

I've posted the code for my test case here:

https://github.com/sumitsu/qpidamqpjmstest

My assumption has been that the Apache Qpid JMS client library should be able to connect to HornetQ's standard AMQP endpoint without further modification -- is that correct? Is there any particular requirement for the wire-level format of the message from HornetQ, beyond simply implementing the AMQP protocol?

Thanks,


Branden Smith
Lead Platform Architect
bsmith@liaison.com

Liaison Technologies, Inc.
3157 Royal Drive | Suite 200 | Alpharetta, Georgia 30022
www.liaison.com | 866.336.7378

-----Original Message-----
From: Rob Godfrey [mailto:rob.j.godfrey@gmail.com] 
Sent: Tuesday 2013.10.15 02:07
To: users@qpid.apache.org
Subject: Re: AMQP-JMS client: messages dequeued as AmqpMessageImpl instances?

Hi Branden,


On 15 October 2013 00:35, Branden Smith <BS...@liaison.com> wrote:

> Hi Rob,
>
> Thanks for this reply as well; I restarted my broker, and now the messages
> are being received as TextMessages.  It's a bit disconcerting, but I'll
> have to wait to address it when/if it comes up again, since at the moment I
> can't reproduce.
>
>
Odd...


> I did have one additional problem wherein the broker is failing to delete
> messages after they are received; I suspect this is also a HornetQ problem
> of some sort, but if you have a moment, could you tell me how I could
> verify that Qpid is properly acknowledging the messages?  For each of the
> received messages, I see logs like this:
>
> FINE: SEND[(host)|0] :
> Disposition{role=receiver,first=2,last=2,settled=false,state=TransactionalState{txnId={oct},outcome=Accepted{}}}
> FINE: RECV[(host)|0] : Disposition{role=sender,first=2,last=2,settled=true}
>
> That looks like an acknowledgment to me, which seems to suggest that
> HornetQ should be removing the messages from the queue, but I'm not
> familiar with the meaning of "settled", so I thought I'd ask.
>
>
Yes - that is effectively the acknowledgement.  The client is saying it has
accepted the message, the broker responding by saying that settled=true is
should be indicating that it has received and processed the "Accepted"
response from the client.  The exchange above is a little unusual, though
not necessarily wrong.  The accept you are sending seems to be part of a
transaction.  As such I wouldn't really expect the disposition to be
settled until after the transaction is committed (settlement is essentially
saying that the state is irrevocable and the transfer will never be
mentioned again). However the pattern shown above does have meaning, I'm
just not sure it's what HornetQ intended.  (If you are interested in the
protocol details, the low level description of how transactions work is
given here:

http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html

The pattern that HornetQ seems to be indicating would be that given in
section 4.4.4 in the text

Delivery Sent Unsettled By Resource; Controller Does Not Settle

The resource might or might not settle the delivery before the transaction
is discharged. If the resource settles the delivery before the discharge
then the behavior after discharge is the same as the case above.
).

Are you committing your transaction later, and what is the trace associated
with that?

Cheers,
Rob


> Thanks again.
>
> Branden Smith
> bsmith@liaison.com
>
>
>
> -----Original Message-----
> From: Rob Godfrey [mailto:rob.j.godfrey@gmail.com]
> Sent: Friday 2013.10.11 18:11
> To: users@qpid.apache.org
> Subject: Re: AMQP-JMS client: messages dequeued as AmqpMessageImpl
> instances?
>
> Hi Branden,
>
> A message will be returned as an AmqpMessageImpl if the client can't work
> out what sort of JMS Message it is supposed to represent.
>
> The logic in the client for detecting a TextMessage is as follows:
>
> else if(bodySection instanceof AmqpValue &&
> ((AmqpValue)bodySection).getValue() instanceof String)
>             {
>                 message = new TextMessageImpl(header, messageAnnotations,
> properties, appProperties,
>                                                 (String)
> ((AmqpValue)bodySection).getValue(), footer, _session);
>             }
>
> That is the client is expecting a TextMessage to be represented in AMQP as
> a message with an amqp-value section where the value is of type String.  It
> seems as if HornetQ is not sending the messages in that form and thus the
> client is not recognising the message as a TextMessage.
>
> Hope this helps,
> Rob
>
>
> On 11 October 2013 23:21, Branden Smith <BS...@liaison.com> wrote:
>
> > I'm using the Qpid AMQP-JMS client to both publish and subscribe to the
> > same queue (provided by HornetQ 2.4.0.beta1); I'm publishing a message
> as a
> > javax.jms.TextMessage, but it is being received by the listener as a
> > org.apache.qpid.amqp_1_0.jms.impl.AmqpMessageImpl.
> >
> > Publisher:
> >
> >     queueSession = queueCnxn.createQueueSession(false,
> > Session.AUTO_ACKNOWLEDGE);
> >     queueProducer = queueSession.createSender(eventQueue);
> >     queueProducer.send(queueSession.createTextMessage("test message"));
> >
> > Consumer:
> >
> >     public class TestConsumer implements MessageListener {
> >         @Override
> >         public void onMessage(Message message) {
> >                 final String logPrefix;
> >                 TextMessage txtMsg;
> >
> >                 logPrefix = "[onMessage] ";
> >                 log.debug(">>> " + logPrefix);
> >
> >                 try {
> >                         message.acknowledge();
> >                         log.debug("message acknowledged");
> >                         log.debug("message type: " + message.getClass());
> >                         //log.debug("body assignable to String??? " +
> > message.isBodyAssignableTo(String.class));
> >                 } catch (JMSException jmsExc) {
> >                         // TODO
> >                         log.error("ack-exception!", jmsExc);
> >                 }
> >       ...
> >
> > Output:
> >
> >     17:07:42.035 [Thread-4] DEBUG >>> [onMessage]
> >     17:07:42.035 [Thread-4] DEBUG message acknowledged
> >     17:07:42.035 [Thread-4] DEBUG message type: class
> > org.apache.qpid.amqp_1_0.jms.impl.AmqpMessageImpl
> >
> > I'm not sure what to do with AmqpMessageImpl; it only inherits from
> > MessageImpl, and only implements javax.jms.Message. Furthermore, if I
> > attempt to call Message.isBodyAssignableTo(Class<T>), I get an
> > AbstractMethodError. Trying to directly cast to TextMessage throws a
> > ClassCastException.
> >
> > The examples all show the consumer receiving the usual types defined by
> > the JMS API; any thoughts as to what I could be doing wrong?
> >
> > Thanks,
> >
> > Branden Smith
> > bsmith@liaison.com
> >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> > For additional commands, e-mail: users-help@qpid.apache.org
> >
> >
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: AMQP-JMS client: messages dequeued as AmqpMessageImpl instances?

Posted by Rob Godfrey <ro...@gmail.com>.
Hi Branden,


On 15 October 2013 00:35, Branden Smith <BS...@liaison.com> wrote:

> Hi Rob,
>
> Thanks for this reply as well; I restarted my broker, and now the messages
> are being received as TextMessages.  It's a bit disconcerting, but I'll
> have to wait to address it when/if it comes up again, since at the moment I
> can't reproduce.
>
>
Odd...


> I did have one additional problem wherein the broker is failing to delete
> messages after they are received; I suspect this is also a HornetQ problem
> of some sort, but if you have a moment, could you tell me how I could
> verify that Qpid is properly acknowledging the messages?  For each of the
> received messages, I see logs like this:
>
> FINE: SEND[(host)|0] :
> Disposition{role=receiver,first=2,last=2,settled=false,state=TransactionalState{txnId={oct},outcome=Accepted{}}}
> FINE: RECV[(host)|0] : Disposition{role=sender,first=2,last=2,settled=true}
>
> That looks like an acknowledgment to me, which seems to suggest that
> HornetQ should be removing the messages from the queue, but I'm not
> familiar with the meaning of "settled", so I thought I'd ask.
>
>
Yes - that is effectively the acknowledgement.  The client is saying it has
accepted the message, the broker responding by saying that settled=true is
should be indicating that it has received and processed the "Accepted"
response from the client.  The exchange above is a little unusual, though
not necessarily wrong.  The accept you are sending seems to be part of a
transaction.  As such I wouldn't really expect the disposition to be
settled until after the transaction is committed (settlement is essentially
saying that the state is irrevocable and the transfer will never be
mentioned again). However the pattern shown above does have meaning, I'm
just not sure it's what HornetQ intended.  (If you are interested in the
protocol details, the low level description of how transactions work is
given here:

http://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transactions-v1.0-os.html

The pattern that HornetQ seems to be indicating would be that given in
section 4.4.4 in the text

Delivery Sent Unsettled By Resource; Controller Does Not Settle

The resource might or might not settle the delivery before the transaction
is discharged. If the resource settles the delivery before the discharge
then the behavior after discharge is the same as the case above.
).

Are you committing your transaction later, and what is the trace associated
with that?

Cheers,
Rob


> Thanks again.
>
> Branden Smith
> bsmith@liaison.com
>
>
>
> -----Original Message-----
> From: Rob Godfrey [mailto:rob.j.godfrey@gmail.com]
> Sent: Friday 2013.10.11 18:11
> To: users@qpid.apache.org
> Subject: Re: AMQP-JMS client: messages dequeued as AmqpMessageImpl
> instances?
>
> Hi Branden,
>
> A message will be returned as an AmqpMessageImpl if the client can't work
> out what sort of JMS Message it is supposed to represent.
>
> The logic in the client for detecting a TextMessage is as follows:
>
> else if(bodySection instanceof AmqpValue &&
> ((AmqpValue)bodySection).getValue() instanceof String)
>             {
>                 message = new TextMessageImpl(header, messageAnnotations,
> properties, appProperties,
>                                                 (String)
> ((AmqpValue)bodySection).getValue(), footer, _session);
>             }
>
> That is the client is expecting a TextMessage to be represented in AMQP as
> a message with an amqp-value section where the value is of type String.  It
> seems as if HornetQ is not sending the messages in that form and thus the
> client is not recognising the message as a TextMessage.
>
> Hope this helps,
> Rob
>
>
> On 11 October 2013 23:21, Branden Smith <BS...@liaison.com> wrote:
>
> > I'm using the Qpid AMQP-JMS client to both publish and subscribe to the
> > same queue (provided by HornetQ 2.4.0.beta1); I'm publishing a message
> as a
> > javax.jms.TextMessage, but it is being received by the listener as a
> > org.apache.qpid.amqp_1_0.jms.impl.AmqpMessageImpl.
> >
> > Publisher:
> >
> >     queueSession = queueCnxn.createQueueSession(false,
> > Session.AUTO_ACKNOWLEDGE);
> >     queueProducer = queueSession.createSender(eventQueue);
> >     queueProducer.send(queueSession.createTextMessage("test message"));
> >
> > Consumer:
> >
> >     public class TestConsumer implements MessageListener {
> >         @Override
> >         public void onMessage(Message message) {
> >                 final String logPrefix;
> >                 TextMessage txtMsg;
> >
> >                 logPrefix = "[onMessage] ";
> >                 log.debug(">>> " + logPrefix);
> >
> >                 try {
> >                         message.acknowledge();
> >                         log.debug("message acknowledged");
> >                         log.debug("message type: " + message.getClass());
> >                         //log.debug("body assignable to String??? " +
> > message.isBodyAssignableTo(String.class));
> >                 } catch (JMSException jmsExc) {
> >                         // TODO
> >                         log.error("ack-exception!", jmsExc);
> >                 }
> >       ...
> >
> > Output:
> >
> >     17:07:42.035 [Thread-4] DEBUG >>> [onMessage]
> >     17:07:42.035 [Thread-4] DEBUG message acknowledged
> >     17:07:42.035 [Thread-4] DEBUG message type: class
> > org.apache.qpid.amqp_1_0.jms.impl.AmqpMessageImpl
> >
> > I'm not sure what to do with AmqpMessageImpl; it only inherits from
> > MessageImpl, and only implements javax.jms.Message. Furthermore, if I
> > attempt to call Message.isBodyAssignableTo(Class<T>), I get an
> > AbstractMethodError. Trying to directly cast to TextMessage throws a
> > ClassCastException.
> >
> > The examples all show the consumer receiving the usual types defined by
> > the JMS API; any thoughts as to what I could be doing wrong?
> >
> > Thanks,
> >
> > Branden Smith
> > bsmith@liaison.com
> >
> >
> > ---------------------------------------------------------------------
> > To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> > For additional commands, e-mail: users-help@qpid.apache.org
> >
> >
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>

RE: AMQP-JMS client: messages dequeued as AmqpMessageImpl instances?

Posted by Branden Smith <BS...@liaison.com>.
Hi Rob,

Thanks for this reply as well; I restarted my broker, and now the messages are being received as TextMessages.  It's a bit disconcerting, but I'll have to wait to address it when/if it comes up again, since at the moment I can't reproduce.

I did have one additional problem wherein the broker is failing to delete messages after they are received; I suspect this is also a HornetQ problem of some sort, but if you have a moment, could you tell me how I could verify that Qpid is properly acknowledging the messages?  For each of the received messages, I see logs like this:

FINE: SEND[(host)|0] : Disposition{role=receiver,first=2,last=2,settled=false,state=TransactionalState{txnId={oct},outcome=Accepted{}}}
FINE: RECV[(host)|0] : Disposition{role=sender,first=2,last=2,settled=true}

That looks like an acknowledgment to me, which seems to suggest that HornetQ should be removing the messages from the queue, but I'm not familiar with the meaning of "settled", so I thought I'd ask.

Thanks again.

Branden Smith
bsmith@liaison.com



-----Original Message-----
From: Rob Godfrey [mailto:rob.j.godfrey@gmail.com] 
Sent: Friday 2013.10.11 18:11
To: users@qpid.apache.org
Subject: Re: AMQP-JMS client: messages dequeued as AmqpMessageImpl instances?

Hi Branden,

A message will be returned as an AmqpMessageImpl if the client can't work
out what sort of JMS Message it is supposed to represent.

The logic in the client for detecting a TextMessage is as follows:

else if(bodySection instanceof AmqpValue &&
((AmqpValue)bodySection).getValue() instanceof String)
            {
                message = new TextMessageImpl(header, messageAnnotations,
properties, appProperties,
                                                (String)
((AmqpValue)bodySection).getValue(), footer, _session);
            }

That is the client is expecting a TextMessage to be represented in AMQP as
a message with an amqp-value section where the value is of type String.  It
seems as if HornetQ is not sending the messages in that form and thus the
client is not recognising the message as a TextMessage.

Hope this helps,
Rob


On 11 October 2013 23:21, Branden Smith <BS...@liaison.com> wrote:

> I'm using the Qpid AMQP-JMS client to both publish and subscribe to the
> same queue (provided by HornetQ 2.4.0.beta1); I'm publishing a message as a
> javax.jms.TextMessage, but it is being received by the listener as a
> org.apache.qpid.amqp_1_0.jms.impl.AmqpMessageImpl.
>
> Publisher:
>
>     queueSession = queueCnxn.createQueueSession(false,
> Session.AUTO_ACKNOWLEDGE);
>     queueProducer = queueSession.createSender(eventQueue);
>     queueProducer.send(queueSession.createTextMessage("test message"));
>
> Consumer:
>
>     public class TestConsumer implements MessageListener {
>         @Override
>         public void onMessage(Message message) {
>                 final String logPrefix;
>                 TextMessage txtMsg;
>
>                 logPrefix = "[onMessage] ";
>                 log.debug(">>> " + logPrefix);
>
>                 try {
>                         message.acknowledge();
>                         log.debug("message acknowledged");
>                         log.debug("message type: " + message.getClass());
>                         //log.debug("body assignable to String??? " +
> message.isBodyAssignableTo(String.class));
>                 } catch (JMSException jmsExc) {
>                         // TODO
>                         log.error("ack-exception!", jmsExc);
>                 }
>       ...
>
> Output:
>
>     17:07:42.035 [Thread-4] DEBUG >>> [onMessage]
>     17:07:42.035 [Thread-4] DEBUG message acknowledged
>     17:07:42.035 [Thread-4] DEBUG message type: class
> org.apache.qpid.amqp_1_0.jms.impl.AmqpMessageImpl
>
> I'm not sure what to do with AmqpMessageImpl; it only inherits from
> MessageImpl, and only implements javax.jms.Message. Furthermore, if I
> attempt to call Message.isBodyAssignableTo(Class<T>), I get an
> AbstractMethodError. Trying to directly cast to TextMessage throws a
> ClassCastException.
>
> The examples all show the consumer receiving the usual types defined by
> the JMS API; any thoughts as to what I could be doing wrong?
>
> Thanks,
>
> Branden Smith
> bsmith@liaison.com
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>


---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Re: AMQP-JMS client: messages dequeued as AmqpMessageImpl instances?

Posted by Rob Godfrey <ro...@gmail.com>.
Hi Branden,

A message will be returned as an AmqpMessageImpl if the client can't work
out what sort of JMS Message it is supposed to represent.

The logic in the client for detecting a TextMessage is as follows:

else if(bodySection instanceof AmqpValue &&
((AmqpValue)bodySection).getValue() instanceof String)
            {
                message = new TextMessageImpl(header, messageAnnotations,
properties, appProperties,
                                                (String)
((AmqpValue)bodySection).getValue(), footer, _session);
            }

That is the client is expecting a TextMessage to be represented in AMQP as
a message with an amqp-value section where the value is of type String.  It
seems as if HornetQ is not sending the messages in that form and thus the
client is not recognising the message as a TextMessage.

Hope this helps,
Rob


On 11 October 2013 23:21, Branden Smith <BS...@liaison.com> wrote:

> I'm using the Qpid AMQP-JMS client to both publish and subscribe to the
> same queue (provided by HornetQ 2.4.0.beta1); I'm publishing a message as a
> javax.jms.TextMessage, but it is being received by the listener as a
> org.apache.qpid.amqp_1_0.jms.impl.AmqpMessageImpl.
>
> Publisher:
>
>     queueSession = queueCnxn.createQueueSession(false,
> Session.AUTO_ACKNOWLEDGE);
>     queueProducer = queueSession.createSender(eventQueue);
>     queueProducer.send(queueSession.createTextMessage("test message"));
>
> Consumer:
>
>     public class TestConsumer implements MessageListener {
>         @Override
>         public void onMessage(Message message) {
>                 final String logPrefix;
>                 TextMessage txtMsg;
>
>                 logPrefix = "[onMessage] ";
>                 log.debug(">>> " + logPrefix);
>
>                 try {
>                         message.acknowledge();
>                         log.debug("message acknowledged");
>                         log.debug("message type: " + message.getClass());
>                         //log.debug("body assignable to String??? " +
> message.isBodyAssignableTo(String.class));
>                 } catch (JMSException jmsExc) {
>                         // TODO
>                         log.error("ack-exception!", jmsExc);
>                 }
>       ...
>
> Output:
>
>     17:07:42.035 [Thread-4] DEBUG >>> [onMessage]
>     17:07:42.035 [Thread-4] DEBUG message acknowledged
>     17:07:42.035 [Thread-4] DEBUG message type: class
> org.apache.qpid.amqp_1_0.jms.impl.AmqpMessageImpl
>
> I'm not sure what to do with AmqpMessageImpl; it only inherits from
> MessageImpl, and only implements javax.jms.Message. Furthermore, if I
> attempt to call Message.isBodyAssignableTo(Class<T>), I get an
> AbstractMethodError. Trying to directly cast to TextMessage throws a
> ClassCastException.
>
> The examples all show the consumer receiving the usual types defined by
> the JMS API; any thoughts as to what I could be doing wrong?
>
> Thanks,
>
> Branden Smith
> bsmith@liaison.com
>
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
> For additional commands, e-mail: users-help@qpid.apache.org
>
>