You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by "Modanese, Riccardo" <Ri...@eurotech.com.INVALID> on 2022/04/20 09:28:58 UTC

Artemis 2.21+ - MQTT devices don’t receive the messages matching their subscriptions

Hello,
    switching from Artemis broker 2.20 to 2.21 we experienced an issue about message delivering.
It looks like MQTT devices don’t receive the messages matching their subscriptions.

The test code (***) run with an Artemis broker 2.19 or 2.20 as target (wildcard addresses modified as (**)) produces the correct output:
waiting for messages
Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH
Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
===
Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH
Client: test-client-2 - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
===
Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS
Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
…

With broker 2.21 or 2.22 (configuration changes described in (*) ) as target the output is:
waiting for messages
===
Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH
===
Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH
===
Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS
…

So the broker doesn’t send any message to the clients.

May be we missed to configure something needed by 2.21 versions onward?

Regards,
Riccardo Modanese



(*) The 2.21 and 2.22 default broker.xml configuration file has changed in this way:
set the broker name (message-broker)
removed double connector bound to 1883 (the broker with the default configuration crashed)
allow only MQTT protocol for connector bound to 1883 port
removed broadcast connector and configuration
added custom wildcard configuration (**)

(**) <wildcard-addresses>
         <routing-enabled>true</routing-enabled>
         <delimiter>/</delimiter>
          <any-words>#</any-words>
          <single-word>+</single-word>
      </wildcard-addresses>

(***)
/*******************************************************************************
* Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
*
* This program and the accompanying materials are made
* available under the terms of the Eclipse Public License 2.0
* which is available at https://www.eclipse.org/legal/epl-2.0/
*
* SPDX-License-Identifier: EPL-2.0
*
* Contributors:
*     Eurotech - initial API and implementation
*******************************************************************************/
package org.eclipse.kapua.qa.common;

import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TestMqttClient {

    protected static Logger logger = LoggerFactory.getLogger(TestMqttClient.class);

    private static final String SERVER_URI = "tcp://localhost:1883";
    private static final String CLIENT_ID_ADMIN = "test-client-admin";
    private static final String CLIENT_ID_1 = "test-client-1";
    private static final String CLIENT_ID_2 = "test-client-2";
    private static final String USERNAME = "kapua-broker";
    private static final String PASSWORD = "kapua-password";
    private static final String USERNAME_ADMIN = "kapua-sys";
    private static final String PASSWORD_ADMIN = "kapua-password";

    private TestMqttClient() {
    }

    public static void main(String argv[]) throws MqttException {
        MqttClient clientAdmin = new MqttClient(SERVER_URI, CLIENT_ID_ADMIN, new MemoryPersistence());
        MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new MemoryPersistence());
        MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new MemoryPersistence());
        clientAdmin.setCallback(new TestMqttClientCallback(CLIENT_ID_ADMIN));
        client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
        client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));

        clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN, PASSWORD_ADMIN));
        client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
        client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
        System.out.println("waiting for messages");
        client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
        client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
        clientAdmin.subscribe("#");

        client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH", new MqttMessage("test".getBytes()));
        System.out.println("===");
        client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH", new MqttMessage("test".getBytes()));
        System.out.println("===");
        clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes()));
        System.out.println("===");
        clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes()));
        System.out.println("===");

        client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes()));
        System.out.println("===");
        client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes()));
        System.out.println("===");
        clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes()));
        System.out.println("===");
        clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes()));
        System.out.println("===");

        clientAdmin.disconnect();
        client1.disconnect();
        client2.disconnect();
    }

    private static MqttConnectOptions getMqttConnectOptions(String username, String password) {
        MqttConnectOptions options = new MqttConnectOptions();
        options.setCleanSession(true);
        options.setUserName(username);
        options.setPassword(password.toCharArray());
        return options;
    }
}

class TestMqttClientCallback implements MqttCallback {

    private String clientId;

    TestMqttClientCallback(String clientId) {
        this.clientId = clientId;
    }

    @Override
    public void messageArrived(String topic, MqttMessage message) throws Exception {
        System.out.println("Client: " + clientId + " - Message arrived on topic: " + topic + " - message: " + new String(message.getPayload()));
    }

    @Override
    public void deliveryComplete(IMqttDeliveryToken token) {
        System.out.println("Client: " + clientId + " - Delivery completed: " + token.getTopics()[0]);
    }

    @Override
    public void connectionLost(Throwable cause) {
        System.out.println("Client: " + clientId + " - Connection lost: " + cause.getMessage());
        cause.printStackTrace();
    }
}

Re: Artemis 2.21+ - MQTT devices don’t receive the messages matching their subscriptions

Posted by Justin Bertram <jb...@apache.org>.
> Now the $EDC/# is not working due to ARTEMIS-3801 right?

I believe that is the case, yes.


Justin

On Fri, Apr 29, 2022 at 2:40 AM Modanese, Riccardo
<Ri...@eurotech.com.invalid> wrote:

> From my understanding our previous subscription
>     ?#?
> Should be changed into 2 different subscriptions:
>     ?#?
>     ?$EDC/#?
>
> Now the $EDC/# is not working due to ARTEMIS-3801 right?
> If so I think it?s not a big problem we should manage 2 different
> subscriptions instead of one.
>
> Thanks
>
>
> Da: Justin Bertram <jb...@apache.org>
> Data: marted?, 26 aprile 2022 22:24
> A: users@activemq.apache.org <us...@activemq.apache.org>
> Oggetto: Re: Artemis 2.21+ - MQTT devices don?t receive the messages
> matching their subscriptions
> I believe I see why this is working for you in ActiveMQ "Classic."
>
> By default ActiveMQ "Classic" will block any messages published to an MQTT
> topic starting with '$' [1]. However, a configuration element named
> "publishDollarTopics"
> was added [2] a long time ago to assist "legacy MQTT applications"
> (presumably those migrating from 3.1 to 3.1.1 since the 3.1 spec makes no
> mention of topics starting with '$'). You are actually setting this
> parameter in Kapua [3].
>
> A similar parameter could potentially be added to ActiveMQ Artemis, but
> it's been 8 years since MQTT 3.1.1 was released and 3 years since MQTT 5
> was released. Any application written against MQTT 3.1 has had ample time
> to be updated, and it's not like this is a gray area in the spec. Both
> 3.1.1 and 5 specifications are very clear about how to treat topics that
> start with '$'.
>
> Is it reasonable to expect that Kapua would change to be spec compliant?
>
>
> Justin
>
> [1]
>
> https://github.com/apache/activemq/blob/59dfbc3302c43054d24a96e699b0eaf1b6d1bf98/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java#L159
> [2] https://issues.apache.org/jira/browse/AMQ-5292
> [3]
>
> https://github.com/eclipse/kapua/blob/master/org.eclipse.kapua.assembly/src/main/resources/conf/broker/activemq.xml#L434
>
> On Tue, Apr 26, 2022 at 9:42 AM Justin Bertram <jb...@apache.org>
> wrote:
>
> > > It?s a huge problem for us since in Kapua we are doing # subscriptions
> > (for internal components) and using $ as topic prefix (also Kura is using
> > this in its namespace) and worked perfectly with ActiveMQ 5.x.
> >
> > This use-case is clearly prohibited by the MQTT specification. It
> > specifically says:
> >
> >   A subscription to ?#? will not receive any messages published to a
> topic
> > beginning with a $
> >
> > If this is working on ActiveMQ 5.x then ActiveMQ 5.x should be fixed to
> > enforce the MQTT specification appropriately.
> >
> > > I should evaluate the impact but I?m not so sure we can move to Artemis
> > with this limitation.
> >
> > I'm not sure I would categorize this as a "limitation" unless you believe
> > enforcing the specification is a limitation.
> >
> > For what it's worth the MQTT specification also says:
> >
> >   Applications cannot use a topic with a leading $ character for their
> own
> > purposes
> >
> > At this point I recommend you change your application to conform with the
> > specification.
> >
> >
> > Justin
> >
> > On Tue, Apr 26, 2022 at 2:19 AM Modanese, Riccardo
> > <Ri...@eurotech.com.invalid> wrote:
> >
> >> It?s a huge problem for us since in Kapua we are doing # subscriptions
> >> (for internal components) and using $ as topic prefix (also Kura is
> using
> >> this in its namespace) and worked perfectly with ActiveMQ 5.x.
> >>
> >> I should evaluate the impact but I?m not so sure we can move to Artemis
> >> with this limitation.
> >>
> >> Riccardo
> >>
> >> Da: Justin Bertram <jb...@apache.org>
> >> Data: marted?, 26 aprile 2022 00:18
> >> A: users@activemq.apache.org <us...@activemq.apache.org>
> >> Oggetto: Re: Artemis 2.21+ - MQTT devices don?t receive the messages
> >> matching their subscriptions
> >> I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken.
> >>
> >> However, it's worth noting that your code will still not work as it did
> >> before because the previous behavior violated the MQTT 3.1.1
> >> specification.
> >> As noted previously, both MQTT 3.1.1 and 5 specifications contain
> >> MQTT-4.7.2-1 which states:
> >>
> >> > The Server MUST NOT match Topic Filters starting with a wildcard
> >> character (# or +) with Topic Names beginning with a $ character.
> >>
> >> Previously your "test-client-admin" client was subscribing to `#` and
> >> receiving messages from topics beginning with `$`. This won't work
> >> anymore.
> >>
> >>
> >> Justin
> >>
> >> [1] https://issues.apache.org/jira/browse/ARTEMIS-3801
> >>
> >> On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo
> >> <Ri...@eurotech.com.invalid> wrote:
> >>
> >> >
> >> > Hello,
> >> >     switching from Artemis broker 2.20 to 2.21 we experienced an issue
> >> > about message delivering.
> >> > It looks like MQTT devices don?t receive the messages matching their
> >> > subscriptions.
> >> >
> >> > The test code (***) run with an Artemis broker 2.19 or 2.20 as target
> >> > (wildcard addresses modified as (**)) produces the correct output:
> >> > waiting for messages
> >> > Client: test-client-1 - Delivery completed:
> >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> >> > Client: test-client-admin - Message arrived on topic:
> >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> >> > Client: test-client-1 - Message arrived on topic:
> >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> >> > ===
> >> > Client: test-client-2 - Delivery completed:
> >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> >> > Client: test-client-2 - Message arrived on topic:
> >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> >> > Client: test-client-admin - Message arrived on topic:
> >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> >> > ===
> >> > Client: test-client-admin - Delivery completed:
> >> > $EDC/kapua-sys/test-client-1/MQTT/APPS
> >> > Client: test-client-admin - Message arrived on topic:
> >> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> >> > Client: test-client-1 - Message arrived on topic:
> >> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> >> > ?
> >> >
> >> > With broker 2.21 or 2.22 (configuration changes described in (*) ) as
> >> > target the output is:
> >> > waiting for messages
> >> > ===
> >> > Client: test-client-1 - Delivery completed:
> >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> >> > ===
> >> > Client: test-client-2 - Delivery completed:
> >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> >> > ===
> >> > Client: test-client-admin - Delivery completed:
> >> > $EDC/kapua-sys/test-client-1/MQTT/APPS
> >> > ?
> >> >
> >> > So the broker doesn?t send any message to the clients.
> >> >
> >> > May be we missed to configure something needed by 2.21 versions
> onward?
> >> >
> >> > Regards,
> >> > Riccardo Modanese
> >> >
> >> >
> >> >
> >> > (*) The 2.21 and 2.22 default broker.xml configuration file has
> changed
> >> in
> >> > this way:
> >> > set the broker name (message-broker)
> >> > removed double connector bound to 1883 (the broker with the default
> >> > configuration crashed)
> >> > allow only MQTT protocol for connector bound to 1883 port
> >> > removed broadcast connector and configuration
> >> > added custom wildcard configuration (**)
> >> >
> >> > (**) <wildcard-addresses>
> >> >          <routing-enabled>true</routing-enabled>
> >> >          <delimiter>/</delimiter>
> >> >           <any-words>#</any-words>
> >> >           <single-word>+</single-word>
> >> >       </wildcard-addresses>
> >> >
> >> > (***)
> >> >
> >> >
> >>
> /*******************************************************************************
> >> > * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
> >> > *
> >> > * This program and the accompanying materials are made
> >> > * available under the terms of the Eclipse Public License 2.0
> >> > * which is available at https://www.eclipse.org/legal/epl-2.0/
> >> > *
> >> > * SPDX-License-Identifier: EPL-2.0
> >> > *
> >> > * Contributors:
> >> > *     Eurotech - initial API and implementation
> >> >
> >> >
> >>
> *******************************************************************************/
> >> > package org.eclipse.kapua.qa.common;
> >> >
> >> > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> >> > import org.eclipse.paho.client.mqttv3.MqttCallback;
> >> > import org.eclipse.paho.client.mqttv3.MqttClient;
> >> > import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> >> > import org.eclipse.paho.client.mqttv3.MqttException;
> >> > import org.eclipse.paho.client.mqttv3.MqttMessage;
> >> > import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> >> > import org.slf4j.Logger;
> >> > import org.slf4j.LoggerFactory;
> >> >
> >> > public class TestMqttClient {
> >> >
> >> >     protected static Logger logger =
> >> > LoggerFactory.getLogger(TestMqttClient.class);
> >> >
> >> >     private static final String SERVER_URI = "tcp://localhost:1883";
> >> >     private static final String CLIENT_ID_ADMIN = "test-client-admin";
> >> >     private static final String CLIENT_ID_1 = "test-client-1";
> >> >     private static final String CLIENT_ID_2 = "test-client-2";
> >> >     private static final String USERNAME = "kapua-broker";
> >> >     private static final String PASSWORD = "kapua-password";
> >> >     private static final String USERNAME_ADMIN = "kapua-sys";
> >> >     private static final String PASSWORD_ADMIN = "kapua-password";
> >> >
> >> >     private TestMqttClient() {
> >> >     }
> >> >
> >> >     public static void main(String argv[]) throws MqttException {
> >> >         MqttClient clientAdmin = new MqttClient(SERVER_URI,
> >> > CLIENT_ID_ADMIN, new MemoryPersistence());
> >> >         MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1,
> new
> >> > MemoryPersistence());
> >> >         MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2,
> new
> >> > MemoryPersistence());
> >> >         clientAdmin.setCallback(new
> >> > TestMqttClientCallback(CLIENT_ID_ADMIN));
> >> >         client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
> >> >         client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
> >> >
> >> >         clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN,
> >> > PASSWORD_ADMIN));
> >> >         client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
> >> >         client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
> >> >         System.out.println("waiting for messages");
> >> >         client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
> >> >         client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
> >> >         clientAdmin.subscribe("#");
> >> >
> >> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> "/MQTT/BIRTH",
> >> > new MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 +
> "/MQTT/BIRTH",
> >> > new MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> >> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> >> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >
> >> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC",
> >> new
> >> > MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC",
> >> new
> >> > MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> >> "/MQTT/DC",
> >> > new MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 +
> >> "/MQTT/DC",
> >> > new MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >
> >> >         clientAdmin.disconnect();
> >> >         client1.disconnect();
> >> >         client2.disconnect();
> >> >     }
> >> >
> >> >     private static MqttConnectOptions getMqttConnectOptions(String
> >> > username, String password) {
> >> >         MqttConnectOptions options = new MqttConnectOptions();
> >> >         options.setCleanSession(true);
> >> >         options.setUserName(username);
> >> >         options.setPassword(password.toCharArray());
> >> >         return options;
> >> >     }
> >> > }
> >> >
> >> > class TestMqttClientCallback implements MqttCallback {
> >> >
> >> >     private String clientId;
> >> >
> >> >     TestMqttClientCallback(String clientId) {
> >> >         this.clientId = clientId;
> >> >     }
> >> >
> >> >     @Override
> >> >     public void messageArrived(String topic, MqttMessage message)
> throws
> >> > Exception {
> >> >         System.out.println("Client: " + clientId + " - Message arrived
> >> on
> >> > topic: " + topic + " - message: " + new String(message.getPayload()));
> >> >     }
> >> >
> >> >     @Override
> >> >     public void deliveryComplete(IMqttDeliveryToken token) {
> >> >         System.out.println("Client: " + clientId + " - Delivery
> >> completed:
> >> > " + token.getTopics()[0]);
> >> >     }
> >> >
> >> >     @Override
> >> >     public void connectionLost(Throwable cause) {
> >> >         System.out.println("Client: " + clientId + " - Connection
> lost:
> >> "
> >> > + cause.getMessage());
> >> >         cause.printStackTrace();
> >> >     }
> >> > }
> >> >
> >>
> >
>

R: Artemis 2.21+ - MQTT devices don’t receive the messages matching their subscriptions

Posted by "Modanese, Riccardo" <Ri...@eurotech.com.INVALID>.
From my understanding our previous subscription
    “#”
Should be changed into 2 different subscriptions:
    “#”
    “$EDC/#”

Now the $EDC/# is not working due to ARTEMIS-3801 right?
If so I think it’s not a big problem we should manage 2 different subscriptions instead of one.

Thanks


Da: Justin Bertram <jb...@apache.org>
Data: martedì, 26 aprile 2022 22:24
A: users@activemq.apache.org <us...@activemq.apache.org>
Oggetto: Re: Artemis 2.21+ - MQTT devices don’t receive the messages matching their subscriptions
I believe I see why this is working for you in ActiveMQ "Classic."

By default ActiveMQ "Classic" will block any messages published to an MQTT
topic starting with '$' [1]. However, a configuration element named
"publishDollarTopics"
was added [2] a long time ago to assist "legacy MQTT applications"
(presumably those migrating from 3.1 to 3.1.1 since the 3.1 spec makes no
mention of topics starting with '$'). You are actually setting this
parameter in Kapua [3].

A similar parameter could potentially be added to ActiveMQ Artemis, but
it's been 8 years since MQTT 3.1.1 was released and 3 years since MQTT 5
was released. Any application written against MQTT 3.1 has had ample time
to be updated, and it's not like this is a gray area in the spec. Both
3.1.1 and 5 specifications are very clear about how to treat topics that
start with '$'.

Is it reasonable to expect that Kapua would change to be spec compliant?


Justin

[1]
https://github.com/apache/activemq/blob/59dfbc3302c43054d24a96e699b0eaf1b6d1bf98/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java#L159
[2] https://issues.apache.org/jira/browse/AMQ-5292
[3]
https://github.com/eclipse/kapua/blob/master/org.eclipse.kapua.assembly/src/main/resources/conf/broker/activemq.xml#L434

On Tue, Apr 26, 2022 at 9:42 AM Justin Bertram <jb...@apache.org> wrote:

> > It’s a huge problem for us since in Kapua we are doing # subscriptions
> (for internal components) and using $ as topic prefix (also Kura is using
> this in its namespace) and worked perfectly with ActiveMQ 5.x.
>
> This use-case is clearly prohibited by the MQTT specification. It
> specifically says:
>
>   A subscription to “#” will not receive any messages published to a topic
> beginning with a $
>
> If this is working on ActiveMQ 5.x then ActiveMQ 5.x should be fixed to
> enforce the MQTT specification appropriately.
>
> > I should evaluate the impact but I’m not so sure we can move to Artemis
> with this limitation.
>
> I'm not sure I would categorize this as a "limitation" unless you believe
> enforcing the specification is a limitation.
>
> For what it's worth the MQTT specification also says:
>
>   Applications cannot use a topic with a leading $ character for their own
> purposes
>
> At this point I recommend you change your application to conform with the
> specification.
>
>
> Justin
>
> On Tue, Apr 26, 2022 at 2:19 AM Modanese, Riccardo
> <Ri...@eurotech.com.invalid> wrote:
>
>> It’s a huge problem for us since in Kapua we are doing # subscriptions
>> (for internal components) and using $ as topic prefix (also Kura is using
>> this in its namespace) and worked perfectly with ActiveMQ 5.x.
>>
>> I should evaluate the impact but I’m not so sure we can move to Artemis
>> with this limitation.
>>
>> Riccardo
>>
>> Da: Justin Bertram <jb...@apache.org>
>> Data: martedì, 26 aprile 2022 00:18
>> A: users@activemq.apache.org <us...@activemq.apache.org>
>> Oggetto: Re: Artemis 2.21+ - MQTT devices don’t receive the messages
>> matching their subscriptions
>> I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken.
>>
>> However, it's worth noting that your code will still not work as it did
>> before because the previous behavior violated the MQTT 3.1.1
>> specification.
>> As noted previously, both MQTT 3.1.1 and 5 specifications contain
>> MQTT-4.7.2-1 which states:
>>
>> > The Server MUST NOT match Topic Filters starting with a wildcard
>> character (# or +) with Topic Names beginning with a $ character.
>>
>> Previously your "test-client-admin" client was subscribing to `#` and
>> receiving messages from topics beginning with `$`. This won't work
>> anymore.
>>
>>
>> Justin
>>
>> [1] https://issues.apache.org/jira/browse/ARTEMIS-3801
>>
>> On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo
>> <Ri...@eurotech.com.invalid> wrote:
>>
>> >
>> > Hello,
>> >     switching from Artemis broker 2.20 to 2.21 we experienced an issue
>> > about message delivering.
>> > It looks like MQTT devices don’t receive the messages matching their
>> > subscriptions.
>> >
>> > The test code (***) run with an Artemis broker 2.19 or 2.20 as target
>> > (wildcard addresses modified as (**)) produces the correct output:
>> > waiting for messages
>> > Client: test-client-1 - Delivery completed:
>> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
>> > Client: test-client-admin - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
>> > Client: test-client-1 - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
>> > ===
>> > Client: test-client-2 - Delivery completed:
>> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
>> > Client: test-client-2 - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
>> > Client: test-client-admin - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
>> > ===
>> > Client: test-client-admin - Delivery completed:
>> > $EDC/kapua-sys/test-client-1/MQTT/APPS
>> > Client: test-client-admin - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
>> > Client: test-client-1 - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
>> > …
>> >
>> > With broker 2.21 or 2.22 (configuration changes described in (*) ) as
>> > target the output is:
>> > waiting for messages
>> > ===
>> > Client: test-client-1 - Delivery completed:
>> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
>> > ===
>> > Client: test-client-2 - Delivery completed:
>> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
>> > ===
>> > Client: test-client-admin - Delivery completed:
>> > $EDC/kapua-sys/test-client-1/MQTT/APPS
>> > …
>> >
>> > So the broker doesn’t send any message to the clients.
>> >
>> > May be we missed to configure something needed by 2.21 versions onward?
>> >
>> > Regards,
>> > Riccardo Modanese
>> >
>> >
>> >
>> > (*) The 2.21 and 2.22 default broker.xml configuration file has changed
>> in
>> > this way:
>> > set the broker name (message-broker)
>> > removed double connector bound to 1883 (the broker with the default
>> > configuration crashed)
>> > allow only MQTT protocol for connector bound to 1883 port
>> > removed broadcast connector and configuration
>> > added custom wildcard configuration (**)
>> >
>> > (**) <wildcard-addresses>
>> >          <routing-enabled>true</routing-enabled>
>> >          <delimiter>/</delimiter>
>> >           <any-words>#</any-words>
>> >           <single-word>+</single-word>
>> >       </wildcard-addresses>
>> >
>> > (***)
>> >
>> >
>> /*******************************************************************************
>> > * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
>> > *
>> > * This program and the accompanying materials are made
>> > * available under the terms of the Eclipse Public License 2.0
>> > * which is available at https://www.eclipse.org/legal/epl-2.0/
>> > *
>> > * SPDX-License-Identifier: EPL-2.0
>> > *
>> > * Contributors:
>> > *     Eurotech - initial API and implementation
>> >
>> >
>> *******************************************************************************/
>> > package org.eclipse.kapua.qa.common;
>> >
>> > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
>> > import org.eclipse.paho.client.mqttv3.MqttCallback;
>> > import org.eclipse.paho.client.mqttv3.MqttClient;
>> > import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
>> > import org.eclipse.paho.client.mqttv3.MqttException;
>> > import org.eclipse.paho.client.mqttv3.MqttMessage;
>> > import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
>> > import org.slf4j.Logger;
>> > import org.slf4j.LoggerFactory;
>> >
>> > public class TestMqttClient {
>> >
>> >     protected static Logger logger =
>> > LoggerFactory.getLogger(TestMqttClient.class);
>> >
>> >     private static final String SERVER_URI = "tcp://localhost:1883";
>> >     private static final String CLIENT_ID_ADMIN = "test-client-admin";
>> >     private static final String CLIENT_ID_1 = "test-client-1";
>> >     private static final String CLIENT_ID_2 = "test-client-2";
>> >     private static final String USERNAME = "kapua-broker";
>> >     private static final String PASSWORD = "kapua-password";
>> >     private static final String USERNAME_ADMIN = "kapua-sys";
>> >     private static final String PASSWORD_ADMIN = "kapua-password";
>> >
>> >     private TestMqttClient() {
>> >     }
>> >
>> >     public static void main(String argv[]) throws MqttException {
>> >         MqttClient clientAdmin = new MqttClient(SERVER_URI,
>> > CLIENT_ID_ADMIN, new MemoryPersistence());
>> >         MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new
>> > MemoryPersistence());
>> >         MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new
>> > MemoryPersistence());
>> >         clientAdmin.setCallback(new
>> > TestMqttClientCallback(CLIENT_ID_ADMIN));
>> >         client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
>> >         client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
>> >
>> >         clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN,
>> > PASSWORD_ADMIN));
>> >         client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>> >         client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>> >         System.out.println("waiting for messages");
>> >         client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
>> >         client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
>> >         clientAdmin.subscribe("#");
>> >
>> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH",
>> > new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH",
>> > new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
>> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
>> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >
>> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC",
>> new
>> > MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC",
>> new
>> > MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
>> "/MQTT/DC",
>> > new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 +
>> "/MQTT/DC",
>> > new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >
>> >         clientAdmin.disconnect();
>> >         client1.disconnect();
>> >         client2.disconnect();
>> >     }
>> >
>> >     private static MqttConnectOptions getMqttConnectOptions(String
>> > username, String password) {
>> >         MqttConnectOptions options = new MqttConnectOptions();
>> >         options.setCleanSession(true);
>> >         options.setUserName(username);
>> >         options.setPassword(password.toCharArray());
>> >         return options;
>> >     }
>> > }
>> >
>> > class TestMqttClientCallback implements MqttCallback {
>> >
>> >     private String clientId;
>> >
>> >     TestMqttClientCallback(String clientId) {
>> >         this.clientId = clientId;
>> >     }
>> >
>> >     @Override
>> >     public void messageArrived(String topic, MqttMessage message) throws
>> > Exception {
>> >         System.out.println("Client: " + clientId + " - Message arrived
>> on
>> > topic: " + topic + " - message: " + new String(message.getPayload()));
>> >     }
>> >
>> >     @Override
>> >     public void deliveryComplete(IMqttDeliveryToken token) {
>> >         System.out.println("Client: " + clientId + " - Delivery
>> completed:
>> > " + token.getTopics()[0]);
>> >     }
>> >
>> >     @Override
>> >     public void connectionLost(Throwable cause) {
>> >         System.out.println("Client: " + clientId + " - Connection lost:
>> "
>> > + cause.getMessage());
>> >         cause.printStackTrace();
>> >     }
>> > }
>> >
>>
>

Re: Artemis 2.21+ - MQTT devices don’t receive the messages matching their subscriptions

Posted by Justin Bertram <jb...@apache.org>.
I believe I see why this is working for you in ActiveMQ "Classic."

By default ActiveMQ "Classic" will block any messages published to an MQTT
topic starting with '$' [1]. However, a configuration element named
"publishDollarTopics"
was added [2] a long time ago to assist "legacy MQTT applications"
(presumably those migrating from 3.1 to 3.1.1 since the 3.1 spec makes no
mention of topics starting with '$'). You are actually setting this
parameter in Kapua [3].

A similar parameter could potentially be added to ActiveMQ Artemis, but
it's been 8 years since MQTT 3.1.1 was released and 3 years since MQTT 5
was released. Any application written against MQTT 3.1 has had ample time
to be updated, and it's not like this is a gray area in the spec. Both
3.1.1 and 5 specifications are very clear about how to treat topics that
start with '$'.

Is it reasonable to expect that Kapua would change to be spec compliant?


Justin

[1]
https://github.com/apache/activemq/blob/59dfbc3302c43054d24a96e699b0eaf1b6d1bf98/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java#L159
[2] https://issues.apache.org/jira/browse/AMQ-5292
[3]
https://github.com/eclipse/kapua/blob/master/org.eclipse.kapua.assembly/src/main/resources/conf/broker/activemq.xml#L434

On Tue, Apr 26, 2022 at 9:42 AM Justin Bertram <jb...@apache.org> wrote:

> > It’s a huge problem for us since in Kapua we are doing # subscriptions
> (for internal components) and using $ as topic prefix (also Kura is using
> this in its namespace) and worked perfectly with ActiveMQ 5.x.
>
> This use-case is clearly prohibited by the MQTT specification. It
> specifically says:
>
>   A subscription to “#” will not receive any messages published to a topic
> beginning with a $
>
> If this is working on ActiveMQ 5.x then ActiveMQ 5.x should be fixed to
> enforce the MQTT specification appropriately.
>
> > I should evaluate the impact but I’m not so sure we can move to Artemis
> with this limitation.
>
> I'm not sure I would categorize this as a "limitation" unless you believe
> enforcing the specification is a limitation.
>
> For what it's worth the MQTT specification also says:
>
>   Applications cannot use a topic with a leading $ character for their own
> purposes
>
> At this point I recommend you change your application to conform with the
> specification.
>
>
> Justin
>
> On Tue, Apr 26, 2022 at 2:19 AM Modanese, Riccardo
> <Ri...@eurotech.com.invalid> wrote:
>
>> It’s a huge problem for us since in Kapua we are doing # subscriptions
>> (for internal components) and using $ as topic prefix (also Kura is using
>> this in its namespace) and worked perfectly with ActiveMQ 5.x.
>>
>> I should evaluate the impact but I’m not so sure we can move to Artemis
>> with this limitation.
>>
>> Riccardo
>>
>> Da: Justin Bertram <jb...@apache.org>
>> Data: martedì, 26 aprile 2022 00:18
>> A: users@activemq.apache.org <us...@activemq.apache.org>
>> Oggetto: Re: Artemis 2.21+ - MQTT devices don’t receive the messages
>> matching their subscriptions
>> I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken.
>>
>> However, it's worth noting that your code will still not work as it did
>> before because the previous behavior violated the MQTT 3.1.1
>> specification.
>> As noted previously, both MQTT 3.1.1 and 5 specifications contain
>> MQTT-4.7.2-1 which states:
>>
>> > The Server MUST NOT match Topic Filters starting with a wildcard
>> character (# or +) with Topic Names beginning with a $ character.
>>
>> Previously your "test-client-admin" client was subscribing to `#` and
>> receiving messages from topics beginning with `$`. This won't work
>> anymore.
>>
>>
>> Justin
>>
>> [1] https://issues.apache.org/jira/browse/ARTEMIS-3801
>>
>> On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo
>> <Ri...@eurotech.com.invalid> wrote:
>>
>> >
>> > Hello,
>> >     switching from Artemis broker 2.20 to 2.21 we experienced an issue
>> > about message delivering.
>> > It looks like MQTT devices don’t receive the messages matching their
>> > subscriptions.
>> >
>> > The test code (***) run with an Artemis broker 2.19 or 2.20 as target
>> > (wildcard addresses modified as (**)) produces the correct output:
>> > waiting for messages
>> > Client: test-client-1 - Delivery completed:
>> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
>> > Client: test-client-admin - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
>> > Client: test-client-1 - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
>> > ===
>> > Client: test-client-2 - Delivery completed:
>> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
>> > Client: test-client-2 - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
>> > Client: test-client-admin - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
>> > ===
>> > Client: test-client-admin - Delivery completed:
>> > $EDC/kapua-sys/test-client-1/MQTT/APPS
>> > Client: test-client-admin - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
>> > Client: test-client-1 - Message arrived on topic:
>> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
>> > …
>> >
>> > With broker 2.21 or 2.22 (configuration changes described in (*) ) as
>> > target the output is:
>> > waiting for messages
>> > ===
>> > Client: test-client-1 - Delivery completed:
>> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
>> > ===
>> > Client: test-client-2 - Delivery completed:
>> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
>> > ===
>> > Client: test-client-admin - Delivery completed:
>> > $EDC/kapua-sys/test-client-1/MQTT/APPS
>> > …
>> >
>> > So the broker doesn’t send any message to the clients.
>> >
>> > May be we missed to configure something needed by 2.21 versions onward?
>> >
>> > Regards,
>> > Riccardo Modanese
>> >
>> >
>> >
>> > (*) The 2.21 and 2.22 default broker.xml configuration file has changed
>> in
>> > this way:
>> > set the broker name (message-broker)
>> > removed double connector bound to 1883 (the broker with the default
>> > configuration crashed)
>> > allow only MQTT protocol for connector bound to 1883 port
>> > removed broadcast connector and configuration
>> > added custom wildcard configuration (**)
>> >
>> > (**) <wildcard-addresses>
>> >          <routing-enabled>true</routing-enabled>
>> >          <delimiter>/</delimiter>
>> >           <any-words>#</any-words>
>> >           <single-word>+</single-word>
>> >       </wildcard-addresses>
>> >
>> > (***)
>> >
>> >
>> /*******************************************************************************
>> > * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
>> > *
>> > * This program and the accompanying materials are made
>> > * available under the terms of the Eclipse Public License 2.0
>> > * which is available at https://www.eclipse.org/legal/epl-2.0/
>> > *
>> > * SPDX-License-Identifier: EPL-2.0
>> > *
>> > * Contributors:
>> > *     Eurotech - initial API and implementation
>> >
>> >
>> *******************************************************************************/
>> > package org.eclipse.kapua.qa.common;
>> >
>> > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
>> > import org.eclipse.paho.client.mqttv3.MqttCallback;
>> > import org.eclipse.paho.client.mqttv3.MqttClient;
>> > import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
>> > import org.eclipse.paho.client.mqttv3.MqttException;
>> > import org.eclipse.paho.client.mqttv3.MqttMessage;
>> > import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
>> > import org.slf4j.Logger;
>> > import org.slf4j.LoggerFactory;
>> >
>> > public class TestMqttClient {
>> >
>> >     protected static Logger logger =
>> > LoggerFactory.getLogger(TestMqttClient.class);
>> >
>> >     private static final String SERVER_URI = "tcp://localhost:1883";
>> >     private static final String CLIENT_ID_ADMIN = "test-client-admin";
>> >     private static final String CLIENT_ID_1 = "test-client-1";
>> >     private static final String CLIENT_ID_2 = "test-client-2";
>> >     private static final String USERNAME = "kapua-broker";
>> >     private static final String PASSWORD = "kapua-password";
>> >     private static final String USERNAME_ADMIN = "kapua-sys";
>> >     private static final String PASSWORD_ADMIN = "kapua-password";
>> >
>> >     private TestMqttClient() {
>> >     }
>> >
>> >     public static void main(String argv[]) throws MqttException {
>> >         MqttClient clientAdmin = new MqttClient(SERVER_URI,
>> > CLIENT_ID_ADMIN, new MemoryPersistence());
>> >         MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new
>> > MemoryPersistence());
>> >         MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new
>> > MemoryPersistence());
>> >         clientAdmin.setCallback(new
>> > TestMqttClientCallback(CLIENT_ID_ADMIN));
>> >         client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
>> >         client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
>> >
>> >         clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN,
>> > PASSWORD_ADMIN));
>> >         client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>> >         client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>> >         System.out.println("waiting for messages");
>> >         client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
>> >         client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
>> >         clientAdmin.subscribe("#");
>> >
>> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH",
>> > new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH",
>> > new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
>> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
>> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >
>> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC",
>> new
>> > MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC",
>> new
>> > MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
>> "/MQTT/DC",
>> > new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 +
>> "/MQTT/DC",
>> > new MqttMessage("test".getBytes()));
>> >         System.out.println("===");
>> >
>> >         clientAdmin.disconnect();
>> >         client1.disconnect();
>> >         client2.disconnect();
>> >     }
>> >
>> >     private static MqttConnectOptions getMqttConnectOptions(String
>> > username, String password) {
>> >         MqttConnectOptions options = new MqttConnectOptions();
>> >         options.setCleanSession(true);
>> >         options.setUserName(username);
>> >         options.setPassword(password.toCharArray());
>> >         return options;
>> >     }
>> > }
>> >
>> > class TestMqttClientCallback implements MqttCallback {
>> >
>> >     private String clientId;
>> >
>> >     TestMqttClientCallback(String clientId) {
>> >         this.clientId = clientId;
>> >     }
>> >
>> >     @Override
>> >     public void messageArrived(String topic, MqttMessage message) throws
>> > Exception {
>> >         System.out.println("Client: " + clientId + " - Message arrived
>> on
>> > topic: " + topic + " - message: " + new String(message.getPayload()));
>> >     }
>> >
>> >     @Override
>> >     public void deliveryComplete(IMqttDeliveryToken token) {
>> >         System.out.println("Client: " + clientId + " - Delivery
>> completed:
>> > " + token.getTopics()[0]);
>> >     }
>> >
>> >     @Override
>> >     public void connectionLost(Throwable cause) {
>> >         System.out.println("Client: " + clientId + " - Connection lost:
>> "
>> > + cause.getMessage());
>> >         cause.printStackTrace();
>> >     }
>> > }
>> >
>>
>

Re: Artemis 2.21+ - MQTT devices don’t receive the messages matching their subscriptions

Posted by Justin Bertram <jb...@apache.org>.
> It’s a huge problem for us since in Kapua we are doing # subscriptions
(for internal components) and using $ as topic prefix (also Kura is using
this in its namespace) and worked perfectly with ActiveMQ 5.x.

This use-case is clearly prohibited by the MQTT specification. It
specifically says:

  A subscription to “#” will not receive any messages published to a topic
beginning with a $

If this is working on ActiveMQ 5.x then ActiveMQ 5.x should be fixed to
enforce the MQTT specification appropriately.

> I should evaluate the impact but I’m not so sure we can move to Artemis
with this limitation.

I'm not sure I would categorize this as a "limitation" unless you believe
enforcing the specification is a limitation.

For what it's worth the MQTT specification also says:

  Applications cannot use a topic with a leading $ character for their own
purposes

At this point I recommend you change your application to conform with the
specification.


Justin

On Tue, Apr 26, 2022 at 2:19 AM Modanese, Riccardo
<Ri...@eurotech.com.invalid> wrote:

> It’s a huge problem for us since in Kapua we are doing # subscriptions
> (for internal components) and using $ as topic prefix (also Kura is using
> this in its namespace) and worked perfectly with ActiveMQ 5.x.
>
> I should evaluate the impact but I’m not so sure we can move to Artemis
> with this limitation.
>
> Riccardo
>
> Da: Justin Bertram <jb...@apache.org>
> Data: martedì, 26 aprile 2022 00:18
> A: users@activemq.apache.org <us...@activemq.apache.org>
> Oggetto: Re: Artemis 2.21+ - MQTT devices don’t receive the messages
> matching their subscriptions
> I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken.
>
> However, it's worth noting that your code will still not work as it did
> before because the previous behavior violated the MQTT 3.1.1 specification.
> As noted previously, both MQTT 3.1.1 and 5 specifications contain
> MQTT-4.7.2-1 which states:
>
> > The Server MUST NOT match Topic Filters starting with a wildcard
> character (# or +) with Topic Names beginning with a $ character.
>
> Previously your "test-client-admin" client was subscribing to `#` and
> receiving messages from topics beginning with `$`. This won't work anymore.
>
>
> Justin
>
> [1] https://issues.apache.org/jira/browse/ARTEMIS-3801
>
> On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo
> <Ri...@eurotech.com.invalid> wrote:
>
> >
> > Hello,
> >     switching from Artemis broker 2.20 to 2.21 we experienced an issue
> > about message delivering.
> > It looks like MQTT devices don’t receive the messages matching their
> > subscriptions.
> >
> > The test code (***) run with an Artemis broker 2.19 or 2.20 as target
> > (wildcard addresses modified as (**)) produces the correct output:
> > waiting for messages
> > Client: test-client-1 - Delivery completed:
> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> > Client: test-client-admin - Message arrived on topic:
> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> > Client: test-client-1 - Message arrived on topic:
> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> > ===
> > Client: test-client-2 - Delivery completed:
> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> > Client: test-client-2 - Message arrived on topic:
> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> > Client: test-client-admin - Message arrived on topic:
> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> > ===
> > Client: test-client-admin - Delivery completed:
> > $EDC/kapua-sys/test-client-1/MQTT/APPS
> > Client: test-client-admin - Message arrived on topic:
> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> > Client: test-client-1 - Message arrived on topic:
> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> > …
> >
> > With broker 2.21 or 2.22 (configuration changes described in (*) ) as
> > target the output is:
> > waiting for messages
> > ===
> > Client: test-client-1 - Delivery completed:
> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> > ===
> > Client: test-client-2 - Delivery completed:
> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> > ===
> > Client: test-client-admin - Delivery completed:
> > $EDC/kapua-sys/test-client-1/MQTT/APPS
> > …
> >
> > So the broker doesn’t send any message to the clients.
> >
> > May be we missed to configure something needed by 2.21 versions onward?
> >
> > Regards,
> > Riccardo Modanese
> >
> >
> >
> > (*) The 2.21 and 2.22 default broker.xml configuration file has changed
> in
> > this way:
> > set the broker name (message-broker)
> > removed double connector bound to 1883 (the broker with the default
> > configuration crashed)
> > allow only MQTT protocol for connector bound to 1883 port
> > removed broadcast connector and configuration
> > added custom wildcard configuration (**)
> >
> > (**) <wildcard-addresses>
> >          <routing-enabled>true</routing-enabled>
> >          <delimiter>/</delimiter>
> >           <any-words>#</any-words>
> >           <single-word>+</single-word>
> >       </wildcard-addresses>
> >
> > (***)
> >
> >
> /*******************************************************************************
> > * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
> > *
> > * This program and the accompanying materials are made
> > * available under the terms of the Eclipse Public License 2.0
> > * which is available at https://www.eclipse.org/legal/epl-2.0/
> > *
> > * SPDX-License-Identifier: EPL-2.0
> > *
> > * Contributors:
> > *     Eurotech - initial API and implementation
> >
> >
> *******************************************************************************/
> > package org.eclipse.kapua.qa.common;
> >
> > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> > import org.eclipse.paho.client.mqttv3.MqttCallback;
> > import org.eclipse.paho.client.mqttv3.MqttClient;
> > import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> > import org.eclipse.paho.client.mqttv3.MqttException;
> > import org.eclipse.paho.client.mqttv3.MqttMessage;
> > import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> > import org.slf4j.Logger;
> > import org.slf4j.LoggerFactory;
> >
> > public class TestMqttClient {
> >
> >     protected static Logger logger =
> > LoggerFactory.getLogger(TestMqttClient.class);
> >
> >     private static final String SERVER_URI = "tcp://localhost:1883";
> >     private static final String CLIENT_ID_ADMIN = "test-client-admin";
> >     private static final String CLIENT_ID_1 = "test-client-1";
> >     private static final String CLIENT_ID_2 = "test-client-2";
> >     private static final String USERNAME = "kapua-broker";
> >     private static final String PASSWORD = "kapua-password";
> >     private static final String USERNAME_ADMIN = "kapua-sys";
> >     private static final String PASSWORD_ADMIN = "kapua-password";
> >
> >     private TestMqttClient() {
> >     }
> >
> >     public static void main(String argv[]) throws MqttException {
> >         MqttClient clientAdmin = new MqttClient(SERVER_URI,
> > CLIENT_ID_ADMIN, new MemoryPersistence());
> >         MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new
> > MemoryPersistence());
> >         MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new
> > MemoryPersistence());
> >         clientAdmin.setCallback(new
> > TestMqttClientCallback(CLIENT_ID_ADMIN));
> >         client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
> >         client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
> >
> >         clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN,
> > PASSWORD_ADMIN));
> >         client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
> >         client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
> >         System.out.println("waiting for messages");
> >         client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
> >         client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
> >         clientAdmin.subscribe("#");
> >
> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH",
> > new MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH",
> > new MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >
> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new
> > MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new
> > MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC",
> > new MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC",
> > new MqttMessage("test".getBytes()));
> >         System.out.println("===");
> >
> >         clientAdmin.disconnect();
> >         client1.disconnect();
> >         client2.disconnect();
> >     }
> >
> >     private static MqttConnectOptions getMqttConnectOptions(String
> > username, String password) {
> >         MqttConnectOptions options = new MqttConnectOptions();
> >         options.setCleanSession(true);
> >         options.setUserName(username);
> >         options.setPassword(password.toCharArray());
> >         return options;
> >     }
> > }
> >
> > class TestMqttClientCallback implements MqttCallback {
> >
> >     private String clientId;
> >
> >     TestMqttClientCallback(String clientId) {
> >         this.clientId = clientId;
> >     }
> >
> >     @Override
> >     public void messageArrived(String topic, MqttMessage message) throws
> > Exception {
> >         System.out.println("Client: " + clientId + " - Message arrived on
> > topic: " + topic + " - message: " + new String(message.getPayload()));
> >     }
> >
> >     @Override
> >     public void deliveryComplete(IMqttDeliveryToken token) {
> >         System.out.println("Client: " + clientId + " - Delivery
> completed:
> > " + token.getTopics()[0]);
> >     }
> >
> >     @Override
> >     public void connectionLost(Throwable cause) {
> >         System.out.println("Client: " + clientId + " - Connection lost: "
> > + cause.getMessage());
> >         cause.printStackTrace();
> >     }
> > }
> >
>

R: Artemis 2.21+ - MQTT devices don’t receive the messages matching their subscriptions

Posted by "Modanese, Riccardo" <Ri...@eurotech.com.INVALID>.
It’s a huge problem for us since in Kapua we are doing # subscriptions (for internal components) and using $ as topic prefix (also Kura is using this in its namespace) and worked perfectly with ActiveMQ 5.x.

I should evaluate the impact but I’m not so sure we can move to Artemis with this limitation.

Riccardo

Da: Justin Bertram <jb...@apache.org>
Data: martedì, 26 aprile 2022 00:18
A: users@activemq.apache.org <us...@activemq.apache.org>
Oggetto: Re: Artemis 2.21+ - MQTT devices don’t receive the messages matching their subscriptions
I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken.

However, it's worth noting that your code will still not work as it did
before because the previous behavior violated the MQTT 3.1.1 specification.
As noted previously, both MQTT 3.1.1 and 5 specifications contain
MQTT-4.7.2-1 which states:

> The Server MUST NOT match Topic Filters starting with a wildcard
character (# or +) with Topic Names beginning with a $ character.

Previously your "test-client-admin" client was subscribing to `#` and
receiving messages from topics beginning with `$`. This won't work anymore.


Justin

[1] https://issues.apache.org/jira/browse/ARTEMIS-3801

On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo
<Ri...@eurotech.com.invalid> wrote:

>
> Hello,
>     switching from Artemis broker 2.20 to 2.21 we experienced an issue
> about message delivering.
> It looks like MQTT devices don’t receive the messages matching their
> subscriptions.
>
> The test code (***) run with an Artemis broker 2.19 or 2.20 as target
> (wildcard addresses modified as (**)) produces the correct output:
> waiting for messages
> Client: test-client-1 - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> Client: test-client-admin - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> Client: test-client-1 - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> ===
> Client: test-client-2 - Delivery completed:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> Client: test-client-2 - Message arrived on topic:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> Client: test-client-admin - Message arrived on topic:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> ===
> Client: test-client-admin - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/APPS
> Client: test-client-admin - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> Client: test-client-1 - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> …
>
> With broker 2.21 or 2.22 (configuration changes described in (*) ) as
> target the output is:
> waiting for messages
> ===
> Client: test-client-1 - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> ===
> Client: test-client-2 - Delivery completed:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> ===
> Client: test-client-admin - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/APPS
> …
>
> So the broker doesn’t send any message to the clients.
>
> May be we missed to configure something needed by 2.21 versions onward?
>
> Regards,
> Riccardo Modanese
>
>
>
> (*) The 2.21 and 2.22 default broker.xml configuration file has changed in
> this way:
> set the broker name (message-broker)
> removed double connector bound to 1883 (the broker with the default
> configuration crashed)
> allow only MQTT protocol for connector bound to 1883 port
> removed broadcast connector and configuration
> added custom wildcard configuration (**)
>
> (**) <wildcard-addresses>
>          <routing-enabled>true</routing-enabled>
>          <delimiter>/</delimiter>
>           <any-words>#</any-words>
>           <single-word>+</single-word>
>       </wildcard-addresses>
>
> (***)
>
> /*******************************************************************************
> * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
> *
> * This program and the accompanying materials are made
> * available under the terms of the Eclipse Public License 2.0
> * which is available at https://www.eclipse.org/legal/epl-2.0/
> *
> * SPDX-License-Identifier: EPL-2.0
> *
> * Contributors:
> *     Eurotech - initial API and implementation
>
> *******************************************************************************/
> package org.eclipse.kapua.qa.common;
>
> import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> import org.eclipse.paho.client.mqttv3.MqttCallback;
> import org.eclipse.paho.client.mqttv3.MqttClient;
> import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> import org.eclipse.paho.client.mqttv3.MqttException;
> import org.eclipse.paho.client.mqttv3.MqttMessage;
> import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> public class TestMqttClient {
>
>     protected static Logger logger =
> LoggerFactory.getLogger(TestMqttClient.class);
>
>     private static final String SERVER_URI = "tcp://localhost:1883";
>     private static final String CLIENT_ID_ADMIN = "test-client-admin";
>     private static final String CLIENT_ID_1 = "test-client-1";
>     private static final String CLIENT_ID_2 = "test-client-2";
>     private static final String USERNAME = "kapua-broker";
>     private static final String PASSWORD = "kapua-password";
>     private static final String USERNAME_ADMIN = "kapua-sys";
>     private static final String PASSWORD_ADMIN = "kapua-password";
>
>     private TestMqttClient() {
>     }
>
>     public static void main(String argv[]) throws MqttException {
>         MqttClient clientAdmin = new MqttClient(SERVER_URI,
> CLIENT_ID_ADMIN, new MemoryPersistence());
>         MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new
> MemoryPersistence());
>         MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new
> MemoryPersistence());
>         clientAdmin.setCallback(new
> TestMqttClientCallback(CLIENT_ID_ADMIN));
>         client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
>         client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
>
>         clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN,
> PASSWORD_ADMIN));
>         client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>         client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>         System.out.println("waiting for messages");
>         client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
>         client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
>         clientAdmin.subscribe("#");
>
>         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> "/MQTT/APPS", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> "/MQTT/APPS", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>
>         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new
> MqttMessage("test".getBytes()));
>         System.out.println("===");
>         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new
> MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>
>         clientAdmin.disconnect();
>         client1.disconnect();
>         client2.disconnect();
>     }
>
>     private static MqttConnectOptions getMqttConnectOptions(String
> username, String password) {
>         MqttConnectOptions options = new MqttConnectOptions();
>         options.setCleanSession(true);
>         options.setUserName(username);
>         options.setPassword(password.toCharArray());
>         return options;
>     }
> }
>
> class TestMqttClientCallback implements MqttCallback {
>
>     private String clientId;
>
>     TestMqttClientCallback(String clientId) {
>         this.clientId = clientId;
>     }
>
>     @Override
>     public void messageArrived(String topic, MqttMessage message) throws
> Exception {
>         System.out.println("Client: " + clientId + " - Message arrived on
> topic: " + topic + " - message: " + new String(message.getPayload()));
>     }
>
>     @Override
>     public void deliveryComplete(IMqttDeliveryToken token) {
>         System.out.println("Client: " + clientId + " - Delivery completed:
> " + token.getTopics()[0]);
>     }
>
>     @Override
>     public void connectionLost(Throwable cause) {
>         System.out.println("Client: " + clientId + " - Connection lost: "
> + cause.getMessage());
>         cause.printStackTrace();
>     }
> }
>

Re: Artemis 2.21+ - MQTT devices don’t receive the messages matching their subscriptions

Posted by Justin Bertram <jb...@apache.org>.
I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken.

However, it's worth noting that your code will still not work as it did
before because the previous behavior violated the MQTT 3.1.1 specification.
As noted previously, both MQTT 3.1.1 and 5 specifications contain
MQTT-4.7.2-1 which states:

> The Server MUST NOT match Topic Filters starting with a wildcard
character (# or +) with Topic Names beginning with a $ character.

Previously your "test-client-admin" client was subscribing to `#` and
receiving messages from topics beginning with `$`. This won't work anymore.


Justin

[1] https://issues.apache.org/jira/browse/ARTEMIS-3801

On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo
<Ri...@eurotech.com.invalid> wrote:

>
> Hello,
>     switching from Artemis broker 2.20 to 2.21 we experienced an issue
> about message delivering.
> It looks like MQTT devices don’t receive the messages matching their
> subscriptions.
>
> The test code (***) run with an Artemis broker 2.19 or 2.20 as target
> (wildcard addresses modified as (**)) produces the correct output:
> waiting for messages
> Client: test-client-1 - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> Client: test-client-admin - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> Client: test-client-1 - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> ===
> Client: test-client-2 - Delivery completed:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> Client: test-client-2 - Message arrived on topic:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> Client: test-client-admin - Message arrived on topic:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> ===
> Client: test-client-admin - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/APPS
> Client: test-client-admin - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> Client: test-client-1 - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> …
>
> With broker 2.21 or 2.22 (configuration changes described in (*) ) as
> target the output is:
> waiting for messages
> ===
> Client: test-client-1 - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> ===
> Client: test-client-2 - Delivery completed:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> ===
> Client: test-client-admin - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/APPS
> …
>
> So the broker doesn’t send any message to the clients.
>
> May be we missed to configure something needed by 2.21 versions onward?
>
> Regards,
> Riccardo Modanese
>
>
>
> (*) The 2.21 and 2.22 default broker.xml configuration file has changed in
> this way:
> set the broker name (message-broker)
> removed double connector bound to 1883 (the broker with the default
> configuration crashed)
> allow only MQTT protocol for connector bound to 1883 port
> removed broadcast connector and configuration
> added custom wildcard configuration (**)
>
> (**) <wildcard-addresses>
>          <routing-enabled>true</routing-enabled>
>          <delimiter>/</delimiter>
>           <any-words>#</any-words>
>           <single-word>+</single-word>
>       </wildcard-addresses>
>
> (***)
>
> /*******************************************************************************
> * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
> *
> * This program and the accompanying materials are made
> * available under the terms of the Eclipse Public License 2.0
> * which is available at https://www.eclipse.org/legal/epl-2.0/
> *
> * SPDX-License-Identifier: EPL-2.0
> *
> * Contributors:
> *     Eurotech - initial API and implementation
>
> *******************************************************************************/
> package org.eclipse.kapua.qa.common;
>
> import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> import org.eclipse.paho.client.mqttv3.MqttCallback;
> import org.eclipse.paho.client.mqttv3.MqttClient;
> import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> import org.eclipse.paho.client.mqttv3.MqttException;
> import org.eclipse.paho.client.mqttv3.MqttMessage;
> import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> public class TestMqttClient {
>
>     protected static Logger logger =
> LoggerFactory.getLogger(TestMqttClient.class);
>
>     private static final String SERVER_URI = "tcp://localhost:1883";
>     private static final String CLIENT_ID_ADMIN = "test-client-admin";
>     private static final String CLIENT_ID_1 = "test-client-1";
>     private static final String CLIENT_ID_2 = "test-client-2";
>     private static final String USERNAME = "kapua-broker";
>     private static final String PASSWORD = "kapua-password";
>     private static final String USERNAME_ADMIN = "kapua-sys";
>     private static final String PASSWORD_ADMIN = "kapua-password";
>
>     private TestMqttClient() {
>     }
>
>     public static void main(String argv[]) throws MqttException {
>         MqttClient clientAdmin = new MqttClient(SERVER_URI,
> CLIENT_ID_ADMIN, new MemoryPersistence());
>         MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new
> MemoryPersistence());
>         MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new
> MemoryPersistence());
>         clientAdmin.setCallback(new
> TestMqttClientCallback(CLIENT_ID_ADMIN));
>         client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
>         client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
>
>         clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN,
> PASSWORD_ADMIN));
>         client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>         client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>         System.out.println("waiting for messages");
>         client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
>         client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
>         clientAdmin.subscribe("#");
>
>         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> "/MQTT/APPS", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> "/MQTT/APPS", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>
>         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new
> MqttMessage("test".getBytes()));
>         System.out.println("===");
>         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new
> MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>
>         clientAdmin.disconnect();
>         client1.disconnect();
>         client2.disconnect();
>     }
>
>     private static MqttConnectOptions getMqttConnectOptions(String
> username, String password) {
>         MqttConnectOptions options = new MqttConnectOptions();
>         options.setCleanSession(true);
>         options.setUserName(username);
>         options.setPassword(password.toCharArray());
>         return options;
>     }
> }
>
> class TestMqttClientCallback implements MqttCallback {
>
>     private String clientId;
>
>     TestMqttClientCallback(String clientId) {
>         this.clientId = clientId;
>     }
>
>     @Override
>     public void messageArrived(String topic, MqttMessage message) throws
> Exception {
>         System.out.println("Client: " + clientId + " - Message arrived on
> topic: " + topic + " - message: " + new String(message.getPayload()));
>     }
>
>     @Override
>     public void deliveryComplete(IMqttDeliveryToken token) {
>         System.out.println("Client: " + clientId + " - Delivery completed:
> " + token.getTopics()[0]);
>     }
>
>     @Override
>     public void connectionLost(Throwable cause) {
>         System.out.println("Client: " + clientId + " - Connection lost: "
> + cause.getMessage());
>         cause.printStackTrace();
>     }
> }
>

Re: Artemis 2.21+ - MQTT devices don’t receive the messages matching their subscriptions

Posted by Justin Bertram <jb...@apache.org>.
I've reproduced this problem and I'm investigating. At first glance it
appears related to the implementation of MQTT-4.7.2-1 which states:

> The Server MUST NOT match Topic Filters starting with a wildcard
character (# or +) with Topic Names beginning with a $ character.

This is part of both MQTT 3.1.1 and 5 specifications, but it wasn't
implemented previously. I implemented it as part of the work for MQTT 5
which is why the problem is showing up in the latest release.


Justin

On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo
<Ri...@eurotech.com.invalid> wrote:

>
> Hello,
>     switching from Artemis broker 2.20 to 2.21 we experienced an issue
> about message delivering.
> It looks like MQTT devices don’t receive the messages matching their
> subscriptions.
>
> The test code (***) run with an Artemis broker 2.19 or 2.20 as target
> (wildcard addresses modified as (**)) produces the correct output:
> waiting for messages
> Client: test-client-1 - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> Client: test-client-admin - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> Client: test-client-1 - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> ===
> Client: test-client-2 - Delivery completed:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> Client: test-client-2 - Message arrived on topic:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> Client: test-client-admin - Message arrived on topic:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> ===
> Client: test-client-admin - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/APPS
> Client: test-client-admin - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> Client: test-client-1 - Message arrived on topic:
> $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> …
>
> With broker 2.21 or 2.22 (configuration changes described in (*) ) as
> target the output is:
> waiting for messages
> ===
> Client: test-client-1 - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> ===
> Client: test-client-2 - Delivery completed:
> $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> ===
> Client: test-client-admin - Delivery completed:
> $EDC/kapua-sys/test-client-1/MQTT/APPS
> …
>
> So the broker doesn’t send any message to the clients.
>
> May be we missed to configure something needed by 2.21 versions onward?
>
> Regards,
> Riccardo Modanese
>
>
>
> (*) The 2.21 and 2.22 default broker.xml configuration file has changed in
> this way:
> set the broker name (message-broker)
> removed double connector bound to 1883 (the broker with the default
> configuration crashed)
> allow only MQTT protocol for connector bound to 1883 port
> removed broadcast connector and configuration
> added custom wildcard configuration (**)
>
> (**) <wildcard-addresses>
>          <routing-enabled>true</routing-enabled>
>          <delimiter>/</delimiter>
>           <any-words>#</any-words>
>           <single-word>+</single-word>
>       </wildcard-addresses>
>
> (***)
>
> /*******************************************************************************
> * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
> *
> * This program and the accompanying materials are made
> * available under the terms of the Eclipse Public License 2.0
> * which is available at https://www.eclipse.org/legal/epl-2.0/
> *
> * SPDX-License-Identifier: EPL-2.0
> *
> * Contributors:
> *     Eurotech - initial API and implementation
>
> *******************************************************************************/
> package org.eclipse.kapua.qa.common;
>
> import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> import org.eclipse.paho.client.mqttv3.MqttCallback;
> import org.eclipse.paho.client.mqttv3.MqttClient;
> import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> import org.eclipse.paho.client.mqttv3.MqttException;
> import org.eclipse.paho.client.mqttv3.MqttMessage;
> import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
>
> public class TestMqttClient {
>
>     protected static Logger logger =
> LoggerFactory.getLogger(TestMqttClient.class);
>
>     private static final String SERVER_URI = "tcp://localhost:1883";
>     private static final String CLIENT_ID_ADMIN = "test-client-admin";
>     private static final String CLIENT_ID_1 = "test-client-1";
>     private static final String CLIENT_ID_2 = "test-client-2";
>     private static final String USERNAME = "kapua-broker";
>     private static final String PASSWORD = "kapua-password";
>     private static final String USERNAME_ADMIN = "kapua-sys";
>     private static final String PASSWORD_ADMIN = "kapua-password";
>
>     private TestMqttClient() {
>     }
>
>     public static void main(String argv[]) throws MqttException {
>         MqttClient clientAdmin = new MqttClient(SERVER_URI,
> CLIENT_ID_ADMIN, new MemoryPersistence());
>         MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new
> MemoryPersistence());
>         MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new
> MemoryPersistence());
>         clientAdmin.setCallback(new
> TestMqttClientCallback(CLIENT_ID_ADMIN));
>         client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
>         client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
>
>         clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN,
> PASSWORD_ADMIN));
>         client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>         client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>         System.out.println("waiting for messages");
>         client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
>         client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
>         clientAdmin.subscribe("#");
>
>         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> "/MQTT/APPS", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> "/MQTT/APPS", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>
>         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new
> MqttMessage("test".getBytes()));
>         System.out.println("===");
>         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new
> MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC",
> new MqttMessage("test".getBytes()));
>         System.out.println("===");
>
>         clientAdmin.disconnect();
>         client1.disconnect();
>         client2.disconnect();
>     }
>
>     private static MqttConnectOptions getMqttConnectOptions(String
> username, String password) {
>         MqttConnectOptions options = new MqttConnectOptions();
>         options.setCleanSession(true);
>         options.setUserName(username);
>         options.setPassword(password.toCharArray());
>         return options;
>     }
> }
>
> class TestMqttClientCallback implements MqttCallback {
>
>     private String clientId;
>
>     TestMqttClientCallback(String clientId) {
>         this.clientId = clientId;
>     }
>
>     @Override
>     public void messageArrived(String topic, MqttMessage message) throws
> Exception {
>         System.out.println("Client: " + clientId + " - Message arrived on
> topic: " + topic + " - message: " + new String(message.getPayload()));
>     }
>
>     @Override
>     public void deliveryComplete(IMqttDeliveryToken token) {
>         System.out.println("Client: " + clientId + " - Delivery completed:
> " + token.getTopics()[0]);
>     }
>
>     @Override
>     public void connectionLost(Throwable cause) {
>         System.out.println("Client: " + clientId + " - Connection lost: "
> + cause.getMessage());
>         cause.printStackTrace();
>     }
> }
>