You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/11/01 15:15:55 UTC

activemq-artemis git commit: Added MQTT DeleteOnNoConsumer Error

Repository: activemq-artemis
Updated Branches:
  refs/heads/ARTEMIS-780 5f428e64a -> 2972e092e


Added MQTT DeleteOnNoConsumer Error


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2972e092
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2972e092
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2972e092

Branch: refs/heads/ARTEMIS-780
Commit: 2972e092e9f0f67b98f95d189974538c2b5b4893
Parents: 5f428e6
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue Nov 1 15:15:32 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Tue Nov 1 15:15:32 2016 +0000

----------------------------------------------------------------------
 .../artemis/api/core/ActiveMQExceptionType.java |  6 ++++
 .../core/ActiveMQInvalidQueueConfiguration.java | 31 ++++++++++++++++++++
 .../protocol/mqtt/MQTTSubscriptionManager.java  |  8 +++--
 .../core/server/ActiveMQMessageBundle.java      |  4 +++
 .../artemis/core/server/ActiveMQServer.java     | 19 ++++++++++++
 .../artemis/core/server/ServerSession.java      |  8 +++++
 .../core/server/impl/ActiveMQServerImpl.java    | 25 ++++++++++++++++
 .../core/server/impl/ServerSessionImpl.java     | 15 ++++++++--
 .../integration/mqtt/imported/MQTTTest.java     | 31 ++++++++++++++++++--
 9 files changed, 141 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2972e092/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
index 309a8c4..785dac3 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
@@ -225,6 +225,12 @@ public enum ActiveMQExceptionType {
       public ActiveMQException createException(String msg) {
          return new ActiveMQUnexpectedRoutingTypeForAddress(msg);
       }
+   },
+   INVALID_QUEUE_CONFIGURATION(216) {
+      @Override
+      public ActiveMQException createException(String msg) {
+         return new ActiveMQInvalidQueueConfiguration(msg);
+      }
    };
 
    private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2972e092/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidQueueConfiguration.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidQueueConfiguration.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidQueueConfiguration.java
new file mode 100644
index 0000000..521a266
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQInvalidQueueConfiguration.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.api.core;
+
+/**
+ * An operation failed because a queue exists on the server.
+ */
+public final class ActiveMQInvalidQueueConfiguration extends ActiveMQException {
+
+   public ActiveMQInvalidQueueConfiguration() {
+      super(ActiveMQExceptionType.INVALID_QUEUE_CONFIGURATION);
+   }
+
+   public ActiveMQInvalidQueueConfiguration(String msg) {
+      super(ActiveMQExceptionType.INVALID_QUEUE_CONFIGURATION, msg);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2972e092/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index a264e88..1187db0 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -93,7 +93,11 @@ public class MQTTSubscriptionManager {
 
       Queue q = session.getServer().locateQueue(queue);
       if (q == null) {
-         q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0);
+         q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, -1, false);
+      } else {
+         if (q.isDeleteOnNoConsumers()) {
+            throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true);
+         }
       }
       return q;
    }
@@ -118,7 +122,7 @@ public class MQTTSubscriptionManager {
 
       String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic);
       AddressInfo addressInfo = session.getServer().getAddressInfo(new SimpleString(coreAddress));
-      if (addressInfo.getRoutingType() != AddressInfo.RoutingType.MULTICAST) {
+      if (addressInfo != null && addressInfo.getRoutingType() != AddressInfo.RoutingType.MULTICAST) {
          throw ActiveMQMessageBundle.BUNDLE.unexpectedRoutingTypeForAddress(new SimpleString(coreAddress), AddressInfo.RoutingType.MULTICAST, addressInfo.getRoutingType());
       }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2972e092/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 9475461..6d8cf30 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -29,6 +29,7 @@ import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.ActiveMQIncompatibleClientServerException;
 import org.apache.activemq.artemis.api.core.ActiveMQInternalErrorException;
 import org.apache.activemq.artemis.api.core.ActiveMQInvalidFilterExpressionException;
+import org.apache.activemq.artemis.api.core.ActiveMQInvalidQueueConfiguration;
 import org.apache.activemq.artemis.api.core.ActiveMQInvalidTransientQueueUseException;
 import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
@@ -386,4 +387,7 @@ public interface ActiveMQMessageBundle {
 
    @Message(id = 119201, value = "Expected Routing Type {1} but found {2} for address {0}", format = Message.Format.MESSAGE_FORMAT)
    ActiveMQUnexpectedRoutingTypeForAddress unexpectedRoutingTypeForAddress(SimpleString address, AddressInfo.RoutingType expectedRoutingType, AddressInfo.RoutingType actualRoutingType);
+
+   @Message(id = 119202, value = "Invalid Queue Configuration for Queue {0}, Address {1}.  Expected {2} to be {3} but was {4}", format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQInvalidQueueConfiguration invalidQueueConfiguration(SimpleString address, SimpleString queueName, String queuePropertyName, Object expectedValue, Object actualValue);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2972e092/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
index ba2a1c7..a266bff 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQServer.java
@@ -316,8 +316,27 @@ public interface ActiveMQServer extends ActiveMQComponent {
                      SimpleString user,
                      boolean durable,
                      boolean temporary,
+                     Integer maxConsumers,
+                     Boolean deleteOnNoConsumers) throws Exception;
+
+   Queue createQueue(SimpleString address,
+                     SimpleString queueName,
+                     SimpleString filter,
+                     SimpleString user,
+                     boolean durable,
+                     boolean temporary,
                      boolean autoCreated) throws Exception;
 
+   Queue createQueue(SimpleString address,
+                     SimpleString queueName,
+                     SimpleString filter,
+                     SimpleString user,
+                     boolean durable,
+                     boolean temporary,
+                     boolean autoCreated,
+                     Integer maxConsumers,
+                     Boolean deleteOnNoConsumers) throws Exception;
+
    Queue deployQueue(SimpleString address,
                      SimpleString queueName,
                      SimpleString filterString,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2972e092/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
index 0df5060..ab3898c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ServerSession.java
@@ -185,6 +185,14 @@ public interface ServerSession extends SecurityAuth {
 
    boolean isClosed();
 
+   Queue createQueue(SimpleString address,
+                     SimpleString name,
+                     SimpleString filterString,
+                     boolean temporary,
+                     boolean durable,
+                     Integer maxConsumers,
+                     Boolean deleteOnNoConsumers) throws Exception;
+
    void createSharedQueue(SimpleString address,
                           SimpleString name,
                           boolean durable,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2972e092/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 3bda134..9a0293e 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -1446,6 +1446,18 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public Queue createQueue(SimpleString address,
+                            SimpleString queueName,
+                            SimpleString filter,
+                            SimpleString user,
+                            boolean durable,
+                            boolean temporary,
+                            Integer maxConsumers,
+                            Boolean deleteOnNoConsumers) throws Exception {
+      return createQueue(address, queueName, filter, user, durable, temporary, false, false, false, maxConsumers, deleteOnNoConsumers);
+   }
+
+   @Override
    public Queue createQueue(final SimpleString address,
                             final SimpleString queueName,
                             final SimpleString filterString,
@@ -1457,6 +1469,19 @@ public class ActiveMQServerImpl implements ActiveMQServer {
    }
 
    @Override
+   public Queue createQueue(SimpleString address,
+                            SimpleString queueName,
+                            SimpleString filter,
+                            SimpleString user,
+                            boolean durable,
+                            boolean temporary,
+                            boolean autoCreated,
+                            Integer maxConsumers,
+                            Boolean deleteOnNoConsumers) throws Exception {
+      return createQueue(address, queueName, filter, user, durable, temporary, false, false, autoCreated, maxConsumers, deleteOnNoConsumers);
+   }
+
+   @Override
    public void createSharedQueue(final SimpleString address,
                                  final SimpleString name,
                                  final SimpleString filterString,

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2972e092/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
index 37d99bb..4a7a89d 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerSessionImpl.java
@@ -500,6 +500,17 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
                             final SimpleString filterString,
                             final boolean temporary,
                             final boolean durable) throws Exception {
+      return createQueue(address, name, filterString, temporary, durable, null, null);
+   }
+
+   @Override
+   public Queue createQueue(final SimpleString address,
+                            final SimpleString name,
+                            final SimpleString filterString,
+                            final boolean temporary,
+                            final boolean durable,
+                            final Integer maxConsumers,
+                            final Boolean deleteOnNoConsumers) throws Exception {
       if (durable) {
          // make sure the user has privileges to create this queue
          securityCheck(address, CheckType.CREATE_DURABLE_QUEUE, this);
@@ -513,9 +524,9 @@ public class ServerSessionImpl implements ServerSession, FailureListener {
 
       // any non-temporary JMS destination created via this method should be marked as auto-created
       if (!temporary && ((address.toString().startsWith(ResourceNames.JMS_QUEUE) && address.equals(name)) || address.toString().startsWith(ResourceNames.JMS_TOPIC))) {
-         queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true);
+         queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, true, maxConsumers, deleteOnNoConsumers);
       } else {
-         queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary);
+         queue = server.createQueue(address, name, filterString, SimpleString.toSimpleString(getUsername()), durable, temporary, maxConsumers, deleteOnNoConsumers);
       }
 
       if (temporary) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2972e092/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index dd0098a..e99fc96 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -1620,6 +1620,7 @@ public class MQTTTest extends MQTTTestSupport {
    public void testClientDisconnectedOnMaxConsumerLimitReached() throws Exception {
       Exception peerDisconnectedException = null;
       try {
+         String clientId = "test.client";
          SimpleString coreAddress = new SimpleString("foo.bar");
          Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
 
@@ -1627,8 +1628,10 @@ public class MQTTTest extends MQTTTestSupport {
          addressInfo.setDefaultMaxConsumers(0);
          getServer().createOrUpdateAddressInfo(addressInfo);
 
+         getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false);
+
          MQTT mqtt = createMQTTConnection();
-         mqtt.setClientId("test-mqtt");
+         mqtt.setClientId(clientId);
          mqtt.setKeepAlive((short) 2);
          final BlockingConnection connection = mqtt.blockingConnection();
          connection.connect();
@@ -1644,6 +1647,7 @@ public class MQTTTest extends MQTTTestSupport {
    public void testClientDisconnectedWhenTryingToSubscribeToAnAnycastAddress() throws Exception {
       Exception peerDisconnectedException = null;
       try {
+         String clientId = "test.mqtt";
          SimpleString coreAddress = new SimpleString("foo.bar");
          Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
 
@@ -1652,7 +1656,30 @@ public class MQTTTest extends MQTTTestSupport {
          getServer().createOrUpdateAddressInfo(addressInfo);
 
          MQTT mqtt = createMQTTConnection();
-         mqtt.setClientId("test-mqtt");
+         mqtt.setClientId(clientId);
+         mqtt.setKeepAlive((short) 2);
+         final BlockingConnection connection = mqtt.blockingConnection();
+         connection.connect();
+         connection.subscribe(mqttSubscription);
+      } catch (EOFException e) {
+         peerDisconnectedException = e;
+      }
+      assertNotNull(peerDisconnectedException);
+      assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected"));
+   }
+
+   @Test(timeout = 60 * 1000)
+   public void testClientDisconnectedWhenTryingToSubscribeToAnExistingQueueWithDeleteOnNoConsumers() throws Exception {
+      Exception peerDisconnectedException = null;
+      try {
+         String clientId = "testMqtt";
+         SimpleString coreAddress = new SimpleString("foo.bar");
+         getServer().createQueue(coreAddress, new SimpleString(clientId + "." + coreAddress), null, false, true, -1, true);
+
+         Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
+
+         MQTT mqtt = createMQTTConnection();
+         mqtt.setClientId(clientId);
          mqtt.setKeepAlive((short) 2);
          final BlockingConnection connection = mqtt.blockingConnection();
          connection.connect();