You are viewing a plain text version of this content. The canonical link for it is here.
Posted to users@activemq.apache.org by Justin Bertram <jb...@apache.org> on 2022/05/02 18:37:27 UTC

Re: Artemis 2.21+ - MQTT devices don’t receive the messages matching their subscriptions

> Now the $EDC/# is not working due to ARTEMIS-3801 right?

I believe that is the case, yes.


Justin

On Fri, Apr 29, 2022 at 2:40 AM Modanese, Riccardo
<Ri...@eurotech.com.invalid> wrote:

> From my understanding our previous subscription
>     ?#?
> Should be changed into 2 different subscriptions:
>     ?#?
>     ?$EDC/#?
>
> Now the $EDC/# is not working due to ARTEMIS-3801 right?
> If so I think it?s not a big problem we should manage 2 different
> subscriptions instead of one.
>
> Thanks
>
>
> Da: Justin Bertram <jb...@apache.org>
> Data: marted?, 26 aprile 2022 22:24
> A: users@activemq.apache.org <us...@activemq.apache.org>
> Oggetto: Re: Artemis 2.21+ - MQTT devices don?t receive the messages
> matching their subscriptions
> I believe I see why this is working for you in ActiveMQ "Classic."
>
> By default ActiveMQ "Classic" will block any messages published to an MQTT
> topic starting with '$' [1]. However, a configuration element named
> "publishDollarTopics"
> was added [2] a long time ago to assist "legacy MQTT applications"
> (presumably those migrating from 3.1 to 3.1.1 since the 3.1 spec makes no
> mention of topics starting with '$'). You are actually setting this
> parameter in Kapua [3].
>
> A similar parameter could potentially be added to ActiveMQ Artemis, but
> it's been 8 years since MQTT 3.1.1 was released and 3 years since MQTT 5
> was released. Any application written against MQTT 3.1 has had ample time
> to be updated, and it's not like this is a gray area in the spec. Both
> 3.1.1 and 5 specifications are very clear about how to treat topics that
> start with '$'.
>
> Is it reasonable to expect that Kapua would change to be spec compliant?
>
>
> Justin
>
> [1]
>
> https://github.com/apache/activemq/blob/59dfbc3302c43054d24a96e699b0eaf1b6d1bf98/activemq-mqtt/src/main/java/org/apache/activemq/transport/mqtt/MQTTProtocolConverter.java#L159
> [2] https://issues.apache.org/jira/browse/AMQ-5292
> [3]
>
> https://github.com/eclipse/kapua/blob/master/org.eclipse.kapua.assembly/src/main/resources/conf/broker/activemq.xml#L434
>
> On Tue, Apr 26, 2022 at 9:42 AM Justin Bertram <jb...@apache.org>
> wrote:
>
> > > It?s a huge problem for us since in Kapua we are doing # subscriptions
> > (for internal components) and using $ as topic prefix (also Kura is using
> > this in its namespace) and worked perfectly with ActiveMQ 5.x.
> >
> > This use-case is clearly prohibited by the MQTT specification. It
> > specifically says:
> >
> >   A subscription to ?#? will not receive any messages published to a
> topic
> > beginning with a $
> >
> > If this is working on ActiveMQ 5.x then ActiveMQ 5.x should be fixed to
> > enforce the MQTT specification appropriately.
> >
> > > I should evaluate the impact but I?m not so sure we can move to Artemis
> > with this limitation.
> >
> > I'm not sure I would categorize this as a "limitation" unless you believe
> > enforcing the specification is a limitation.
> >
> > For what it's worth the MQTT specification also says:
> >
> >   Applications cannot use a topic with a leading $ character for their
> own
> > purposes
> >
> > At this point I recommend you change your application to conform with the
> > specification.
> >
> >
> > Justin
> >
> > On Tue, Apr 26, 2022 at 2:19 AM Modanese, Riccardo
> > <Ri...@eurotech.com.invalid> wrote:
> >
> >> It?s a huge problem for us since in Kapua we are doing # subscriptions
> >> (for internal components) and using $ as topic prefix (also Kura is
> using
> >> this in its namespace) and worked perfectly with ActiveMQ 5.x.
> >>
> >> I should evaluate the impact but I?m not so sure we can move to Artemis
> >> with this limitation.
> >>
> >> Riccardo
> >>
> >> Da: Justin Bertram <jb...@apache.org>
> >> Data: marted?, 26 aprile 2022 00:18
> >> A: users@activemq.apache.org <us...@activemq.apache.org>
> >> Oggetto: Re: Artemis 2.21+ - MQTT devices don?t receive the messages
> >> matching their subscriptions
> >> I opened ARTEMIS-3801 [1] and sent a PR to fix what was broken.
> >>
> >> However, it's worth noting that your code will still not work as it did
> >> before because the previous behavior violated the MQTT 3.1.1
> >> specification.
> >> As noted previously, both MQTT 3.1.1 and 5 specifications contain
> >> MQTT-4.7.2-1 which states:
> >>
> >> > The Server MUST NOT match Topic Filters starting with a wildcard
> >> character (# or +) with Topic Names beginning with a $ character.
> >>
> >> Previously your "test-client-admin" client was subscribing to `#` and
> >> receiving messages from topics beginning with `$`. This won't work
> >> anymore.
> >>
> >>
> >> Justin
> >>
> >> [1] https://issues.apache.org/jira/browse/ARTEMIS-3801
> >>
> >> On Wed, Apr 20, 2022 at 4:29 AM Modanese, Riccardo
> >> <Ri...@eurotech.com.invalid> wrote:
> >>
> >> >
> >> > Hello,
> >> >     switching from Artemis broker 2.20 to 2.21 we experienced an issue
> >> > about message delivering.
> >> > It looks like MQTT devices don?t receive the messages matching their
> >> > subscriptions.
> >> >
> >> > The test code (***) run with an Artemis broker 2.19 or 2.20 as target
> >> > (wildcard addresses modified as (**)) produces the correct output:
> >> > waiting for messages
> >> > Client: test-client-1 - Delivery completed:
> >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> >> > Client: test-client-admin - Message arrived on topic:
> >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> >> > Client: test-client-1 - Message arrived on topic:
> >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> >> > ===
> >> > Client: test-client-2 - Delivery completed:
> >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> >> > Client: test-client-2 - Message arrived on topic:
> >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> >> > Client: test-client-admin - Message arrived on topic:
> >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> >> > ===
> >> > Client: test-client-admin - Delivery completed:
> >> > $EDC/kapua-sys/test-client-1/MQTT/APPS
> >> > Client: test-client-admin - Message arrived on topic:
> >> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> >> > Client: test-client-1 - Message arrived on topic:
> >> > $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> >> > ?
> >> >
> >> > With broker 2.21 or 2.22 (configuration changes described in (*) ) as
> >> > target the output is:
> >> > waiting for messages
> >> > ===
> >> > Client: test-client-1 - Delivery completed:
> >> > $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> >> > ===
> >> > Client: test-client-2 - Delivery completed:
> >> > $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> >> > ===
> >> > Client: test-client-admin - Delivery completed:
> >> > $EDC/kapua-sys/test-client-1/MQTT/APPS
> >> > ?
> >> >
> >> > So the broker doesn?t send any message to the clients.
> >> >
> >> > May be we missed to configure something needed by 2.21 versions
> onward?
> >> >
> >> > Regards,
> >> > Riccardo Modanese
> >> >
> >> >
> >> >
> >> > (*) The 2.21 and 2.22 default broker.xml configuration file has
> changed
> >> in
> >> > this way:
> >> > set the broker name (message-broker)
> >> > removed double connector bound to 1883 (the broker with the default
> >> > configuration crashed)
> >> > allow only MQTT protocol for connector bound to 1883 port
> >> > removed broadcast connector and configuration
> >> > added custom wildcard configuration (**)
> >> >
> >> > (**) <wildcard-addresses>
> >> >          <routing-enabled>true</routing-enabled>
> >> >          <delimiter>/</delimiter>
> >> >           <any-words>#</any-words>
> >> >           <single-word>+</single-word>
> >> >       </wildcard-addresses>
> >> >
> >> > (***)
> >> >
> >> >
> >>
> /*******************************************************************************
> >> > * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
> >> > *
> >> > * This program and the accompanying materials are made
> >> > * available under the terms of the Eclipse Public License 2.0
> >> > * which is available at https://www.eclipse.org/legal/epl-2.0/
> >> > *
> >> > * SPDX-License-Identifier: EPL-2.0
> >> > *
> >> > * Contributors:
> >> > *     Eurotech - initial API and implementation
> >> >
> >> >
> >>
> *******************************************************************************/
> >> > package org.eclipse.kapua.qa.common;
> >> >
> >> > import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> >> > import org.eclipse.paho.client.mqttv3.MqttCallback;
> >> > import org.eclipse.paho.client.mqttv3.MqttClient;
> >> > import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> >> > import org.eclipse.paho.client.mqttv3.MqttException;
> >> > import org.eclipse.paho.client.mqttv3.MqttMessage;
> >> > import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> >> > import org.slf4j.Logger;
> >> > import org.slf4j.LoggerFactory;
> >> >
> >> > public class TestMqttClient {
> >> >
> >> >     protected static Logger logger =
> >> > LoggerFactory.getLogger(TestMqttClient.class);
> >> >
> >> >     private static final String SERVER_URI = "tcp://localhost:1883";
> >> >     private static final String CLIENT_ID_ADMIN = "test-client-admin";
> >> >     private static final String CLIENT_ID_1 = "test-client-1";
> >> >     private static final String CLIENT_ID_2 = "test-client-2";
> >> >     private static final String USERNAME = "kapua-broker";
> >> >     private static final String PASSWORD = "kapua-password";
> >> >     private static final String USERNAME_ADMIN = "kapua-sys";
> >> >     private static final String PASSWORD_ADMIN = "kapua-password";
> >> >
> >> >     private TestMqttClient() {
> >> >     }
> >> >
> >> >     public static void main(String argv[]) throws MqttException {
> >> >         MqttClient clientAdmin = new MqttClient(SERVER_URI,
> >> > CLIENT_ID_ADMIN, new MemoryPersistence());
> >> >         MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1,
> new
> >> > MemoryPersistence());
> >> >         MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2,
> new
> >> > MemoryPersistence());
> >> >         clientAdmin.setCallback(new
> >> > TestMqttClientCallback(CLIENT_ID_ADMIN));
> >> >         client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
> >> >         client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
> >> >
> >> >         clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN,
> >> > PASSWORD_ADMIN));
> >> >         client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
> >> >         client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
> >> >         System.out.println("waiting for messages");
> >> >         client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
> >> >         client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
> >> >         clientAdmin.subscribe("#");
> >> >
> >> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> "/MQTT/BIRTH",
> >> > new MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 +
> "/MQTT/BIRTH",
> >> > new MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> >> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> >> > "/MQTT/APPS", new MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >
> >> >         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC",
> >> new
> >> > MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC",
> >> new
> >> > MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 +
> >> "/MQTT/DC",
> >> > new MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 +
> >> "/MQTT/DC",
> >> > new MqttMessage("test".getBytes()));
> >> >         System.out.println("===");
> >> >
> >> >         clientAdmin.disconnect();
> >> >         client1.disconnect();
> >> >         client2.disconnect();
> >> >     }
> >> >
> >> >     private static MqttConnectOptions getMqttConnectOptions(String
> >> > username, String password) {
> >> >         MqttConnectOptions options = new MqttConnectOptions();
> >> >         options.setCleanSession(true);
> >> >         options.setUserName(username);
> >> >         options.setPassword(password.toCharArray());
> >> >         return options;
> >> >     }
> >> > }
> >> >
> >> > class TestMqttClientCallback implements MqttCallback {
> >> >
> >> >     private String clientId;
> >> >
> >> >     TestMqttClientCallback(String clientId) {
> >> >         this.clientId = clientId;
> >> >     }
> >> >
> >> >     @Override
> >> >     public void messageArrived(String topic, MqttMessage message)
> throws
> >> > Exception {
> >> >         System.out.println("Client: " + clientId + " - Message arrived
> >> on
> >> > topic: " + topic + " - message: " + new String(message.getPayload()));
> >> >     }
> >> >
> >> >     @Override
> >> >     public void deliveryComplete(IMqttDeliveryToken token) {
> >> >         System.out.println("Client: " + clientId + " - Delivery
> >> completed:
> >> > " + token.getTopics()[0]);
> >> >     }
> >> >
> >> >     @Override
> >> >     public void connectionLost(Throwable cause) {
> >> >         System.out.println("Client: " + clientId + " - Connection
> lost:
> >> "
> >> > + cause.getMessage());
> >> >         cause.printStackTrace();
> >> >     }
> >> > }
> >> >
> >>
> >
>