You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/11/07 16:36:47 UTC

[27/50] [abbrv] activemq-artemis git commit: MQTT Handle ANYCAST addresses

MQTT Handle ANYCAST addresses


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/0772b547
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/0772b547
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/0772b547

Branch: refs/heads/ARTEMIS-780
Commit: 0772b5478774fef25534a7c9f060c65decf959f6
Parents: be04eac
Author: Martyn Taylor <mt...@redhat.com>
Authored: Tue Nov 1 12:28:06 2016 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Nov 7 11:28:07 2016 -0500

----------------------------------------------------------------------
 .../artemis/api/core/ActiveMQExceptionType.java |  6 ++++
 ...ActiveMQUnexpectedRoutingTypeForAddress.java | 31 ++++++++++++++++++++
 .../protocol/mqtt/MQTTSubscriptionManager.java  | 17 ++++++++---
 .../core/server/ActiveMQMessageBundle.java      |  5 ++++
 .../integration/mqtt/imported/MQTTTest.java     | 24 +++++++++++++++
 5 files changed, 79 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0772b547/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
index 0221562..309a8c4 100644
--- a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQExceptionType.java
@@ -219,6 +219,12 @@ public enum ActiveMQExceptionType {
       public ActiveMQException createException(String msg) {
          return new ActiveMQQueueMaxConsumerLimitReached(msg);
       }
+   },
+   UNEXPECTED_ROUTING_TYPE_FOR_ADDRESS(215) {
+      @Override
+      public ActiveMQException createException(String msg) {
+         return new ActiveMQUnexpectedRoutingTypeForAddress(msg);
+      }
    };
 
    private static final Map<Integer, ActiveMQExceptionType> TYPE_MAP;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0772b547/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java
----------------------------------------------------------------------
diff --git a/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java
new file mode 100644
index 0000000..1bd7ecd
--- /dev/null
+++ b/artemis-commons/src/main/java/org/apache/activemq/artemis/api/core/ActiveMQUnexpectedRoutingTypeForAddress.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.api.core;
+
+/**
+ * An operation failed because a queue exists on the server.
+ */
+public final class ActiveMQUnexpectedRoutingTypeForAddress extends ActiveMQException {
+
+   public ActiveMQUnexpectedRoutingTypeForAddress() {
+      super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED);
+   }
+
+   public ActiveMQUnexpectedRoutingTypeForAddress(String msg) {
+      super(ActiveMQExceptionType.MAX_CONSUMER_LIMIT_EXCEEDED, msg);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0772b547/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index d894910..a264e88 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -25,8 +25,10 @@ import java.util.concurrent.ConcurrentMap;
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
 import org.apache.activemq.artemis.api.core.FilterConstants;
 import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 
 public class MQTTSubscriptionManager {
 
@@ -61,7 +63,8 @@ public class MQTTSubscriptionManager {
 
    synchronized void start() throws Exception {
       for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) {
-         Queue q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value());
+         String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(subscription.topicName());
+         Queue q = createQueueForSubscription(coreAddress, subscription.qualityOfService().value());
          createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
       }
    }
@@ -84,8 +87,8 @@ public class MQTTSubscriptionManager {
    /**
     * Creates a Queue if it doesn't already exist, based on a topic and address.  Returning the queue name.
     */
-   private Queue createQueueForSubscription(String topic, int qos) throws Exception {
-      String address = MQTTUtil.convertMQTTAddressFilterToCore(topic);
+   private Queue createQueueForSubscription(String address, int qos) throws Exception {
+
       SimpleString queue = getQueueNameForTopic(address);
 
       Queue q = session.getServer().locateQueue(queue);
@@ -113,9 +116,15 @@ public class MQTTSubscriptionManager {
       int qos = subscription.qualityOfService().value();
       String topic = subscription.topicName();
 
+      String coreAddress = MQTTUtil.convertMQTTAddressFilterToCore(topic);
+      AddressInfo addressInfo = session.getServer().getAddressInfo(new SimpleString(coreAddress));
+      if (addressInfo.getRoutingType() != AddressInfo.RoutingType.MULTICAST) {
+         throw ActiveMQMessageBundle.BUNDLE.unexpectedRoutingTypeForAddress(new SimpleString(coreAddress), AddressInfo.RoutingType.MULTICAST, addressInfo.getRoutingType());
+      }
+
       session.getSessionState().addSubscription(subscription);
 
-      Queue q = createQueueForSubscription(topic, qos);
+      Queue q = createQueueForSubscription(coreAddress, qos);
 
       if (s == null) {
          createConsumerForSubscriptionQueue(q, topic, qos);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0772b547/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
index 769d183..9475461 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/ActiveMQMessageBundle.java
@@ -35,11 +35,13 @@ import org.apache.activemq.artemis.api.core.ActiveMQQueueExistsException;
 import org.apache.activemq.artemis.api.core.ActiveMQQueueMaxConsumerLimitReached;
 import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
 import org.apache.activemq.artemis.api.core.ActiveMQSessionCreationException;
+import org.apache.activemq.artemis.api.core.ActiveMQUnexpectedRoutingTypeForAddress;
 import org.apache.activemq.artemis.api.core.DiscoveryGroupConfiguration;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.postoffice.Binding;
 import org.apache.activemq.artemis.core.protocol.core.impl.wireformat.ReplicationSyncFileMessage;
 import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.server.impl.AddressInfo;
 import org.jboss.logging.Messages;
 import org.jboss.logging.annotations.Cause;
 import org.jboss.logging.annotations.Message;
@@ -381,4 +383,7 @@ public interface ActiveMQMessageBundle {
 
    @Message(id = 119200, value = "Maximum Consumer Limit Reached on Queue:(address={0},queue={1})", format = Message.Format.MESSAGE_FORMAT)
    ActiveMQQueueMaxConsumerLimitReached maxConsumerLimitReachedForQueue(SimpleString address, SimpleString queueName);
+
+   @Message(id = 119201, value = "Expected Routing Type {1} but found {2} for address {0}", format = Message.Format.MESSAGE_FORMAT)
+   ActiveMQUnexpectedRoutingTypeForAddress unexpectedRoutingTypeForAddress(SimpleString address, AddressInfo.RoutingType expectedRoutingType, AddressInfo.RoutingType actualRoutingType);
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/0772b547/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 6406955..dd0098a 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -1639,4 +1639,28 @@ public class MQTTTest extends MQTTTestSupport {
       assertNotNull(peerDisconnectedException);
       assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected"));
    }
+
+   @Test(timeout = 60 * 1000)
+   public void testClientDisconnectedWhenTryingToSubscribeToAnAnycastAddress() throws Exception {
+      Exception peerDisconnectedException = null;
+      try {
+         SimpleString coreAddress = new SimpleString("foo.bar");
+         Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
+
+         AddressInfo addressInfo = new AddressInfo(coreAddress);
+         addressInfo.setRoutingType(AddressInfo.RoutingType.ANYCAST);
+         getServer().createOrUpdateAddressInfo(addressInfo);
+
+         MQTT mqtt = createMQTTConnection();
+         mqtt.setClientId("test-mqtt");
+         mqtt.setKeepAlive((short) 2);
+         final BlockingConnection connection = mqtt.blockingConnection();
+         connection.connect();
+         connection.subscribe(mqttSubscription);
+      } catch (EOFException e) {
+         peerDisconnectedException = e;
+      }
+      assertNotNull(peerDisconnectedException);
+      assertTrue(peerDisconnectedException.getMessage().contains("Peer disconnected"));
+   }
 }