You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Clebert Suconic (Jira)" <ji...@apache.org> on 2022/04/28 15:15:00 UTC

[jira] [Closed] (ARTEMIS-3801) Not getting messages on MQTT subscriptions with $

     [ https://issues.apache.org/jira/browse/ARTEMIS-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Clebert Suconic closed ARTEMIS-3801.
------------------------------------
    Resolution: Fixed

> Not getting messages on MQTT subscriptions with $
> -------------------------------------------------
>
>                 Key: ARTEMIS-3801
>                 URL: https://issues.apache.org/jira/browse/ARTEMIS-3801
>             Project: ActiveMQ Artemis
>          Issue Type: Bug
>            Reporter: Justin Bertram
>            Assignee: Justin Bertram
>            Priority: Major
>             Fix For: 2.22.0
>
>          Time Spent: 10m
>  Remaining Estimate: 0h
>
> Switching from Artemis broker 2.20 to 2.21 we experienced an issue about message delivering.
> It looks like MQTT devices don’t receive the messages matching their subscriptions.
> The test code (\*\*\*) run with an Artemis broker 2.19 or 2.20 as target (wildcard addresses modified as (\*\*)) produces the correct output:
> {noformat}
> waiting for messages
> Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> ===
> Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> Client: test-client-2 - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> ===
> Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS
> Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> …{noformat}
> With broker 2.21 or 2.22 (configuration changes described in (\*) ) as target the output is:
> {noformat}
> waiting for messages
> ===
> Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> ===
> Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> ===
> Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS
> …{noformat}
> So the broker doesn’t send any message to the clients. May be we missed to configure something needed by 2.21 versions onward?
> \*:
> The 2.21 and 2.22-SNAPSHOT default {{broker.xml}} configuration file has changed in this way:
>  - set the broker name (message-broker)
>  - removed double connector bound to 1883 (the broker with the default configuration crashed)
>  - allow only MQTT protocol for connector bound to 1883 port
>  - removed broadcast connector and configuration
>  - added custom wildcard configuration (\*\*)
> \*\*:
> {code:xml}
>  <wildcard-addresses>
>     <routing-enabled>true</routing-enabled>
>     <delimiter>/</delimiter>
>     <any-words>#</any-words>
>     <single-word>+</single-word>
> </wildcard-addresses>{code}
> \*\*\*:
> {code:java}
> /*******************************************************************************
> * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
> *
> * This program and the accompanying materials are made
> * available under the terms of the Eclipse Public License 2.0
> * which is available at https://www.eclipse.org/legal/epl-2.0/
> *
> * SPDX-License-Identifier: EPL-2.0
> *
> * Contributors:
> *     Eurotech - initial API and implementation
> *******************************************************************************/
> package org.eclipse.kapua.qa.common;
> import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> import org.eclipse.paho.client.mqttv3.MqttCallback;
> import org.eclipse.paho.client.mqttv3.MqttClient;
> import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> import org.eclipse.paho.client.mqttv3.MqttException;
> import org.eclipse.paho.client.mqttv3.MqttMessage;
> import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> public class TestMqttClient {
>     protected static Logger logger = LoggerFactory.getLogger(TestMqttClient.class);
>     private static final String SERVER_URI = "tcp://localhost:1883";
>     private static final String CLIENT_ID_ADMIN = "test-client-admin";
>     private static final String CLIENT_ID_1 = "test-client-1";
>     private static final String CLIENT_ID_2 = "test-client-2";
>     private static final String USERNAME = "kapua-broker";
>     private static final String PASSWORD = "kapua-password";
>     private static final String USERNAME_ADMIN = "kapua-sys";
>     private static final String PASSWORD_ADMIN = "kapua-password";
>     private TestMqttClient() {
>     }
>     public static void main(String argv[]) throws MqttException {
>         MqttClient clientAdmin = new MqttClient(SERVER_URI, CLIENT_ID_ADMIN, new MemoryPersistence());
>         MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new MemoryPersistence());
>         MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new MemoryPersistence());
>         clientAdmin.setCallback(new TestMqttClientCallback(CLIENT_ID_ADMIN));
>         client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
>         client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
>         clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN, PASSWORD_ADMIN));
>         client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>         client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
>         System.out.println("waiting for messages");
>         client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
>         client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
>         clientAdmin.subscribe("#");
>         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes()));
>         System.out.println("===");
>         clientAdmin.disconnect();
>         client1.disconnect();
>         client2.disconnect();
>     }
>     private static MqttConnectOptions getMqttConnectOptions(String username, String password) {
>         MqttConnectOptions options = new MqttConnectOptions();
>         options.setCleanSession(true);
>         options.setUserName(username);
>         options.setPassword(password.toCharArray());
>         return options;
>     }
> }
> class TestMqttClientCallback implements MqttCallback {
>     private String clientId;
>     TestMqttClientCallback(String clientId) {
>         this.clientId = clientId;
>     }
>     @Override
>     public void messageArrived(String topic, MqttMessage message) throws Exception {
>         System.out.println("Client: " + clientId + " - Message arrived on topic: " + topic + " - message: " + new String(message.getPayload()));
>     }
>     @Override
>     public void deliveryComplete(IMqttDeliveryToken token) {
>         System.out.println("Client: " + clientId + " - Delivery completed: " + token.getTopics()[0]);
>     }
>     @Override
>     public void connectionLost(Throwable cause) {
>         System.out.println("Client: " + clientId + " - Connection lost: " + cause.getMessage());
>         cause.printStackTrace();
>     }
> }{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)