You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by "Modanese, Riccardo" <Ri...@eurotech.com.INVALID> on 2021/09/17 10:13:57 UTC

AMQP to MQTT connector message exchange issue

Hi all,
   I’m facing a weird behavior when trying to exchange messages between devices (emulated by clients) connected to different ActiveMQ connectors (ActiveMQ version 5.14.5).
On one side I have an AMQP device emulated by qpid-jms library.
This device should exchange messages with another one connected through the MQTT connector.
The messages from AMQP client are published to the broker (I have a custom security filter and I can log every message published and in which topic) but don’t flow to the MQTT client.
I’m not sure if the virtual topic feature can be responsible of this.
I tried to publish messages with and without VirtualTopic prefix but without any success.

This is the connectors configuration:


<transportConnector name="mqtt" uri="mqtt+nio://0.0.0.0:1883?transport.defaultKeepAlive=60000&amp;transport.maximumConnections=1000&amp;transport.socketBufferSize=131072&amp;transport.ioBufferSize=16384&amp;transport.activeMQSubscriptionPrefetch=32766&amp;transport.publishDollarTopics=true&amp;transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"/>



<transportConnector name="amqp"uri="amqp+nio://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048576&amp;transport.transformer=jms" />

Any ideas?

Riccardo

Re: AMQP to MQTT connector message exchange issue

Posted by Matt Pavlovich <ma...@gmail.com>.
Hey Riccardo-

You are welcome, happy to hear you are over the big hurdle.  Happy hacking, let us know how it goes =)

-Matt Pavlovich

> On Sep 17, 2021, at 10:20 AM, Modanese, Riccardo <Ri...@eurotech.com.INVALID> wrote:
> 
> Hi Matt, thanks for your help!
> 
> Just few minutes ago I realized which could be the cause. But it still sounds weird to me.
> The session was in auto_ack so the broker received the message from the client (and I was sure due to my security plugin log) but was waiting before delivery it to the subscriber.
> Switching to transacted mode (and committing the session after publishing the message) allows the consumer to receive the message.
> I'd like to investigate a little bit more to try to figure out why the auto_ack seems to not work as expected (may be since I keep the publisher alive for other operations but anyway the send method is finished).
> But for sure is not a message exchange issue between the AMQP and MQTT connectors as I was suspecting few hours ago (anyway I need to manipulate the topic prefixes since the VirtualTopic prefix is managed by the MQTT connector but not by the AMQP one)
> 
> Riccardo
> 
> Il giorno 17/09/21, 16:48 "Matt Pavlovich" <ma...@gmail.com> ha scritto:
> 
>    Hi Riccardo-
> 
>    The broker stats should help provide clues as to where the messages went. What are the destinations that have enqueue counts? Also, what do you see with consumer counts when the mqtt client connects?
> 
>    Note on the mqtt w/ virtual topic strategy — the VIrtualTopic config is hard-coded to use VirtualTopic.* and Consumer.*. for the topic and queue prefixes. Also, the mqtt clients connect _without_ the virtual topic prefixes. They connect up with just topic://FOO and then get routed to topic://VirtualTopic.FOO and then the queues.. queue://Consumer.*.VirtualTopic.FOO <queue://Consumer.*.VirtualTopic.FOO>. 
> 
>    Hope this helps!
> 
>    -Matt Pavlovich
> 
>> On Sep 17, 2021, at 5:13 AM, Modanese, Riccardo <Ri...@eurotech.com.INVALID> wrote:
>> 
>> Hi all,
>>  I’m facing a weird behavior when trying to exchange messages between devices (emulated by clients) connected to different ActiveMQ connectors (ActiveMQ version 5.14.5).
>> On one side I have an AMQP device emulated by qpid-jms library.
>> This device should exchange messages with another one connected through the MQTT connector.
>> The messages from AMQP client are published to the broker (I have a custom security filter and I can log every message published and in which topic) but don’t flow to the MQTT client.
>> I’m not sure if the virtual topic feature can be responsible of this.
>> I tried to publish messages with and without VirtualTopic prefix but without any success.
>> 
>> This is the connectors configuration:
>> 
>> 
>> <transportConnector name="mqtt" uri="mqtt+nio://0.0.0.0:1883?transport.defaultKeepAlive=60000&amp;transport.maximumConnections=1000&amp;transport.socketBufferSize=131072&amp;transport.ioBufferSize=16384&amp;transport.activeMQSubscriptionPrefetch=32766&amp;transport.publishDollarTopics=true&amp;transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"/>
>> 
>> 
>> 
>> <transportConnector name="amqp"uri="amqp+nio://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048576&amp;transport.transformer=jms" />
>> 
>> Any ideas?
>> 
>> Riccardo
> 
> 


Re: AMQP to MQTT connector message exchange issue

Posted by "Modanese, Riccardo" <Ri...@eurotech.com.INVALID>.
Hi Matt, thanks for your help!

Just few minutes ago I realized which could be the cause. But it still sounds weird to me.
The session was in auto_ack so the broker received the message from the client (and I was sure due to my security plugin log) but was waiting before delivery it to the subscriber.
Switching to transacted mode (and committing the session after publishing the message) allows the consumer to receive the message.
I'd like to investigate a little bit more to try to figure out why the auto_ack seems to not work as expected (may be since I keep the publisher alive for other operations but anyway the send method is finished).
But for sure is not a message exchange issue between the AMQP and MQTT connectors as I was suspecting few hours ago (anyway I need to manipulate the topic prefixes since the VirtualTopic prefix is managed by the MQTT connector but not by the AMQP one)

Riccardo

Il giorno 17/09/21, 16:48 "Matt Pavlovich" <ma...@gmail.com> ha scritto:

    Hi Riccardo-

    The broker stats should help provide clues as to where the messages went. What are the destinations that have enqueue counts? Also, what do you see with consumer counts when the mqtt client connects?

    Note on the mqtt w/ virtual topic strategy — the VIrtualTopic config is hard-coded to use VirtualTopic.* and Consumer.*. for the topic and queue prefixes. Also, the mqtt clients connect _without_ the virtual topic prefixes. They connect up with just topic://FOO and then get routed to topic://VirtualTopic.FOO and then the queues.. queue://Consumer.*.VirtualTopic.FOO <queue://Consumer.*.VirtualTopic.FOO>. 

    Hope this helps!

    -Matt Pavlovich

    > On Sep 17, 2021, at 5:13 AM, Modanese, Riccardo <Ri...@eurotech.com.INVALID> wrote:
    > 
    > Hi all,
    >   I’m facing a weird behavior when trying to exchange messages between devices (emulated by clients) connected to different ActiveMQ connectors (ActiveMQ version 5.14.5).
    > On one side I have an AMQP device emulated by qpid-jms library.
    > This device should exchange messages with another one connected through the MQTT connector.
    > The messages from AMQP client are published to the broker (I have a custom security filter and I can log every message published and in which topic) but don’t flow to the MQTT client.
    > I’m not sure if the virtual topic feature can be responsible of this.
    > I tried to publish messages with and without VirtualTopic prefix but without any success.
    > 
    > This is the connectors configuration:
    > 
    > 
    > <transportConnector name="mqtt" uri="mqtt+nio://0.0.0.0:1883?transport.defaultKeepAlive=60000&amp;transport.maximumConnections=1000&amp;transport.socketBufferSize=131072&amp;transport.ioBufferSize=16384&amp;transport.activeMQSubscriptionPrefetch=32766&amp;transport.publishDollarTopics=true&amp;transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"/>
    > 
    > 
    > 
    > <transportConnector name="amqp"uri="amqp+nio://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048576&amp;transport.transformer=jms" />
    > 
    > Any ideas?
    > 
    > Riccardo



Re: AMQP to MQTT connector message exchange issue

Posted by Matt Pavlovich <ma...@gmail.com>.
Hi Riccardo-

The broker stats should help provide clues as to where the messages went. What are the destinations that have enqueue counts? Also, what do you see with consumer counts when the mqtt client connects?

Note on the mqtt w/ virtual topic strategy — the VIrtualTopic config is hard-coded to use VirtualTopic.* and Consumer.*. for the topic and queue prefixes. Also, the mqtt clients connect _without_ the virtual topic prefixes. They connect up with just topic://FOO and then get routed to topic://VirtualTopic.FOO and then the queues.. queue://Consumer.*.VirtualTopic.FOO <queue://Consumer.*.VirtualTopic.FOO>. 

Hope this helps!

-Matt Pavlovich

> On Sep 17, 2021, at 5:13 AM, Modanese, Riccardo <Ri...@eurotech.com.INVALID> wrote:
> 
> Hi all,
>   I’m facing a weird behavior when trying to exchange messages between devices (emulated by clients) connected to different ActiveMQ connectors (ActiveMQ version 5.14.5).
> On one side I have an AMQP device emulated by qpid-jms library.
> This device should exchange messages with another one connected through the MQTT connector.
> The messages from AMQP client are published to the broker (I have a custom security filter and I can log every message published and in which topic) but don’t flow to the MQTT client.
> I’m not sure if the virtual topic feature can be responsible of this.
> I tried to publish messages with and without VirtualTopic prefix but without any success.
> 
> This is the connectors configuration:
> 
> 
> <transportConnector name="mqtt" uri="mqtt+nio://0.0.0.0:1883?transport.defaultKeepAlive=60000&amp;transport.maximumConnections=1000&amp;transport.socketBufferSize=131072&amp;transport.ioBufferSize=16384&amp;transport.activeMQSubscriptionPrefetch=32766&amp;transport.publishDollarTopics=true&amp;transport.subscriptionStrategy=mqtt-virtual-topic-subscriptions"/>
> 
> 
> 
> <transportConnector name="amqp"uri="amqp+nio://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=1048576&amp;transport.transformer=jms" />
> 
> Any ideas?
> 
> Riccardo