You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2018/01/18 14:59:51 UTC

[2/2] activemq-artemis git commit: ARTEMIS-1617 - Properly set autoCreated flag on address

ARTEMIS-1617 - Properly set autoCreated flag on address

Flag needs to be set when auto creating an address so that the address
can be removed later if auto delete is configured when creating a
subscription with MQTT


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3aef7caa
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3aef7caa
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3aef7caa

Branch: refs/heads/master
Commit: 3aef7caac6b63567df4705869ba9609c53c0a8ec
Parents: 0d9a114
Author: Christopher L. Shannon (cshannon) <ch...@gmail.com>
Authored: Thu Jan 18 08:36:07 2018 -0500
Committer: Justin Bertram <jb...@apache.org>
Committed: Thu Jan 18 08:59:35 2018 -0600

----------------------------------------------------------------------
 .../protocol/mqtt/MQTTSubscriptionManager.java     | 16 +++++++++-------
 .../tests/integration/mqtt/imported/MQTTTest.java  | 17 +++++++++++++++++
 2 files changed, 26 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aef7caa/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index ae6b56c..49ab5d9 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -24,28 +24,29 @@ import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 
-import io.netty.handler.codec.mqtt.MqttTopicSubscription;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.FilterConstants;
+import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
 import org.apache.activemq.artemis.core.server.Queue;
-import org.apache.activemq.artemis.api.core.RoutingType;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.apache.activemq.artemis.utils.CompositeAddress;
 
+import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+
 public class MQTTSubscriptionManager {
 
-   private MQTTSession session;
+   private final MQTTSession session;
 
-   private ConcurrentMap<Long, Integer> consumerQoSLevels;
+   private final ConcurrentMap<Long, Integer> consumerQoSLevels;
 
-   private ConcurrentMap<String, ServerConsumer> consumers;
+   private final ConcurrentMap<String, ServerConsumer> consumers;
 
    // We filter out Artemis management messages and notifications
-   private SimpleString managementFilter;
+   private final SimpleString managementFilter;
 
    public MQTTSubscriptionManager(MQTTSession session) {
       this.session = session;
@@ -108,7 +109,8 @@ public class MQTTSubscriptionManager {
             if (!bindingQueryResult.isAutoCreateAddresses()) {
                throw ActiveMQMessageBundle.BUNDLE.addressDoesNotExist(SimpleString.toSimpleString(address));
             }
-            addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address), RoutingType.MULTICAST, false);
+            addressInfo = session.getServerSession().createAddress(SimpleString.toSimpleString(address),
+                                                                   RoutingType.MULTICAST, true);
          }
          return findOrCreateQueue(bindingQueryResult, addressInfo, queue, qos);
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3aef7caa/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index d5978b0..bfc83e0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -48,6 +48,7 @@ import org.apache.activemq.artemis.core.protocol.mqtt.MQTTUtil;
 import org.apache.activemq.artemis.core.server.ActiveMQServer;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.impl.AddressInfo;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
 import org.apache.activemq.artemis.tests.util.Wait;
 import org.apache.activemq.transport.amqp.client.AmqpClient;
 import org.apache.activemq.transport.amqp.client.AmqpConnection;
@@ -1946,4 +1947,20 @@ public class MQTTTest extends MQTTTestSupport {
             connection2.disconnect();
       }
    }
+
+   @Test
+   public void autoDestroyAddress() throws Exception {
+      AddressSettings addressSettings = new AddressSettings();
+      addressSettings.setAutoDeleteAddresses(true);
+      server.getAddressSettingsRepository().addMatch("foo.bar", addressSettings);
+
+      final MQTTClientProvider subscriptionProvider = getMQTTClientProvider();
+      initializeConnection(subscriptionProvider);
+      subscriptionProvider.subscribe("foo/bar", AT_MOST_ONCE);
+      assertNotNull(server.getAddressInfo(SimpleString.toSimpleString("foo.bar")));
+
+      subscriptionProvider.disconnect();
+
+      assertNull(server.getAddressInfo(SimpleString.toSimpleString("foo.bar")));
+   }
 }