You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@activemq.apache.org by "Clebert Suconic (Jira)" <ji...@apache.org> on 2022/04/28 15:15:00 UTC
[jira] [Closed] (ARTEMIS-3801) Not getting messages on MQTT subscriptions with $
[ https://issues.apache.org/jira/browse/ARTEMIS-3801?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Clebert Suconic closed ARTEMIS-3801.
------------------------------------
Resolution: Fixed
> Not getting messages on MQTT subscriptions with $
> -------------------------------------------------
>
> Key: ARTEMIS-3801
> URL: https://issues.apache.org/jira/browse/ARTEMIS-3801
> Project: ActiveMQ Artemis
> Issue Type: Bug
> Reporter: Justin Bertram
> Assignee: Justin Bertram
> Priority: Major
> Fix For: 2.22.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> Switching from Artemis broker 2.20 to 2.21 we experienced an issue about message delivering.
> It looks like MQTT devices don’t receive the messages matching their subscriptions.
> The test code (\*\*\*) run with an Artemis broker 2.19 or 2.20 as target (wildcard addresses modified as (\*\*)) produces the correct output:
> {noformat}
> waiting for messages
> Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/BIRTH - message: test
> ===
> Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> Client: test-client-2 - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-2/MQTT/BIRTH - message: test
> ===
> Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS
> Client: test-client-admin - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> Client: test-client-1 - Message arrived on topic: $EDC/kapua-sys/test-client-1/MQTT/APPS - message: test
> …{noformat}
> With broker 2.21 or 2.22 (configuration changes described in (\*) ) as target the output is:
> {noformat}
> waiting for messages
> ===
> Client: test-client-1 - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/BIRTH
> ===
> Client: test-client-2 - Delivery completed: $EDC/kapua-sys/test-client-2/MQTT/BIRTH
> ===
> Client: test-client-admin - Delivery completed: $EDC/kapua-sys/test-client-1/MQTT/APPS
> …{noformat}
> So the broker doesn’t send any message to the clients. May be we missed to configure something needed by 2.21 versions onward?
> \*:
> The 2.21 and 2.22-SNAPSHOT default {{broker.xml}} configuration file has changed in this way:
> - set the broker name (message-broker)
> - removed double connector bound to 1883 (the broker with the default configuration crashed)
> - allow only MQTT protocol for connector bound to 1883 port
> - removed broadcast connector and configuration
> - added custom wildcard configuration (\*\*)
> \*\*:
> {code:xml}
> <wildcard-addresses>
> <routing-enabled>true</routing-enabled>
> <delimiter>/</delimiter>
> <any-words>#</any-words>
> <single-word>+</single-word>
> </wildcard-addresses>{code}
> \*\*\*:
> {code:java}
> /*******************************************************************************
> * Copyright (c) 2021, 2022 Eurotech and/or its affiliates and others
> *
> * This program and the accompanying materials are made
> * available under the terms of the Eclipse Public License 2.0
> * which is available at https://www.eclipse.org/legal/epl-2.0/
> *
> * SPDX-License-Identifier: EPL-2.0
> *
> * Contributors:
> * Eurotech - initial API and implementation
> *******************************************************************************/
> package org.eclipse.kapua.qa.common;
> import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
> import org.eclipse.paho.client.mqttv3.MqttCallback;
> import org.eclipse.paho.client.mqttv3.MqttClient;
> import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
> import org.eclipse.paho.client.mqttv3.MqttException;
> import org.eclipse.paho.client.mqttv3.MqttMessage;
> import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
> import org.slf4j.Logger;
> import org.slf4j.LoggerFactory;
> public class TestMqttClient {
> protected static Logger logger = LoggerFactory.getLogger(TestMqttClient.class);
> private static final String SERVER_URI = "tcp://localhost:1883";
> private static final String CLIENT_ID_ADMIN = "test-client-admin";
> private static final String CLIENT_ID_1 = "test-client-1";
> private static final String CLIENT_ID_2 = "test-client-2";
> private static final String USERNAME = "kapua-broker";
> private static final String PASSWORD = "kapua-password";
> private static final String USERNAME_ADMIN = "kapua-sys";
> private static final String PASSWORD_ADMIN = "kapua-password";
> private TestMqttClient() {
> }
> public static void main(String argv[]) throws MqttException {
> MqttClient clientAdmin = new MqttClient(SERVER_URI, CLIENT_ID_ADMIN, new MemoryPersistence());
> MqttClient client1 = new MqttClient(SERVER_URI, CLIENT_ID_1, new MemoryPersistence());
> MqttClient client2 = new MqttClient(SERVER_URI, CLIENT_ID_2, new MemoryPersistence());
> clientAdmin.setCallback(new TestMqttClientCallback(CLIENT_ID_ADMIN));
> client1.setCallback(new TestMqttClientCallback(CLIENT_ID_1));
> client2.setCallback(new TestMqttClientCallback(CLIENT_ID_2));
> clientAdmin.connect(getMqttConnectOptions(USERNAME_ADMIN, PASSWORD_ADMIN));
> client1.connect(getMqttConnectOptions(USERNAME, PASSWORD));
> client2.connect(getMqttConnectOptions(USERNAME, PASSWORD));
> System.out.println("waiting for messages");
> client1.subscribe("$EDC/kapua-sys/" + CLIENT_ID_1 + "/#");
> client2.subscribe("$EDC/kapua-sys/" + CLIENT_ID_2 + "/#");
> clientAdmin.subscribe("#");
> client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/BIRTH", new MqttMessage("test".getBytes()));
> System.out.println("===");
> client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/BIRTH", new MqttMessage("test".getBytes()));
> System.out.println("===");
> clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes()));
> System.out.println("===");
> clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/APPS", new MqttMessage("test".getBytes()));
> System.out.println("===");
> client1.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes()));
> System.out.println("===");
> client2.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes()));
> System.out.println("===");
> clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_1 + "/MQTT/DC", new MqttMessage("test".getBytes()));
> System.out.println("===");
> clientAdmin.publish("$EDC/kapua-sys/" + CLIENT_ID_2 + "/MQTT/DC", new MqttMessage("test".getBytes()));
> System.out.println("===");
> clientAdmin.disconnect();
> client1.disconnect();
> client2.disconnect();
> }
> private static MqttConnectOptions getMqttConnectOptions(String username, String password) {
> MqttConnectOptions options = new MqttConnectOptions();
> options.setCleanSession(true);
> options.setUserName(username);
> options.setPassword(password.toCharArray());
> return options;
> }
> }
> class TestMqttClientCallback implements MqttCallback {
> private String clientId;
> TestMqttClientCallback(String clientId) {
> this.clientId = clientId;
> }
> @Override
> public void messageArrived(String topic, MqttMessage message) throws Exception {
> System.out.println("Client: " + clientId + " - Message arrived on topic: " + topic + " - message: " + new String(message.getPayload()));
> }
> @Override
> public void deliveryComplete(IMqttDeliveryToken token) {
> System.out.println("Client: " + clientId + " - Delivery completed: " + token.getTopics()[0]);
> }
> @Override
> public void connectionLost(Throwable cause) {
> System.out.println("Client: " + clientId + " - Connection lost: " + cause.getMessage());
> cause.printStackTrace();
> }
> }{code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)