You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/10/10 21:24:23 UTC

[1/2] activemq-artemis git commit: ARTEMIS-778 Fix MQTT tests, refactor session state

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 646c8ce7a -> c684e9c8a


ARTEMIS-778 Fix MQTT tests, refactor session state


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a0934869
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a0934869
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a0934869

Branch: refs/heads/master
Commit: a09348695ca6eb5dd2a29a76f33ef1633945c0ca
Parents: 646c8ce
Author: Martyn Taylor <mt...@redhat.com>
Authored: Mon Oct 10 11:09:43 2016 +0100
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Oct 10 23:24:14 2016 +0200

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTProtocolHandler.java |  12 ++-
 .../core/protocol/mqtt/MQTTPublishManager.java  |  93 ++++++++--------
 .../protocol/mqtt/MQTTRetainMessageManager.java |   3 +-
 .../core/protocol/mqtt/MQTTSessionState.java    | 105 +++++++++----------
 .../protocol/mqtt/MQTTSubscriptionManager.java  |  19 ++--
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  41 +++++---
 .../integration/mqtt/imported/MQTTTest.java     |  26 +++++
 7 files changed, 169 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0934869/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
index 68648cd..5d73f57 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTProtocolHandler.java
@@ -96,7 +96,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
 
          connection.dataReceived();
 
-         MQTTUtil.logMessage(log, message, true);
+         MQTTUtil.logMessage(session.getState(), message, true);
 
          switch (message.fixedHeader().messageType()) {
             case CONNECT:
@@ -145,7 +145,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
                disconnect();
          }
       } catch (Exception e) {
-         log.warn("Error processing Control Packet, Disconnecting Client" + e.getMessage());
+         log.debug("Error processing Control Packet, Disconnecting Client", e);
          disconnect();
       }
    }
@@ -243,6 +243,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
 
       MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.SUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
       MqttSubAckMessage ack = new MqttSubAckMessage(header, message.variableHeader(), new MqttSubAckPayload(qos));
+      MQTTUtil.logMessage(session.getSessionState(), ack, false);
       ctx.write(ack);
       ctx.flush();
    }
@@ -255,6 +256,7 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
       session.getSubscriptionManager().removeSubscriptions(message.payload().topics());
       MqttFixedHeader header = new MqttFixedHeader(MqttMessageType.UNSUBACK, false, MqttQoS.AT_MOST_ONCE, false, 0);
       MqttUnsubAckMessage m = new MqttUnsubAckMessage(header, message.variableHeader());
+      MQTTUtil.logMessage(session.getSessionState(), m, false);
       ctx.write(m);
       ctx.flush();
    }
@@ -264,7 +266,9 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
    }
 
    void handlePingreq(MqttMessage message, ChannelHandlerContext ctx) {
-      ctx.write(new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0)));
+      MqttMessage pingResp = new MqttMessage(new MqttFixedHeader(MqttMessageType.PINGRESP, false, MqttQoS.AT_MOST_ONCE, false, 0));
+      MQTTUtil.logMessage(session.getSessionState(), pingResp, false);
+      ctx.write(pingResp);
       ctx.flush();
    }
 
@@ -285,6 +289,8 @@ public class MQTTProtocolHandler extends ChannelInboundHandlerAdapter {
       MqttMessage publish = new MqttPublishMessage(header, varHeader, payload);
       this.protocolManager.invokeOutgoing(publish, connection);
 
+      MQTTUtil.logMessage(session.getSessionState(), publish, false);
+
       ctx.write(publish);
       ctx.flush();
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0934869/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 96c6bf6..fb3363f 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -22,11 +22,12 @@ import java.io.UnsupportedEncodingException;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.EmptyByteBuf;
-import io.netty.handler.codec.mqtt.MqttMessageType;
+import org.apache.activemq.artemis.api.core.ActiveMQIllegalStateException;
 import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.Pair;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.server.Queue;
 import org.apache.activemq.artemis.core.server.ServerConsumer;
 import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
@@ -48,11 +49,18 @@ public class MQTTPublishManager {
 
    private final Object lock = new Object();
 
+   private MQTTSessionState state;
+
+   private MQTTSessionState.OutboundStore outboundStore;
+
    public MQTTPublishManager(MQTTSession session) {
       this.session = session;
    }
 
    synchronized void start() throws Exception {
+      this.state = session.getSessionState();
+      this.outboundStore = state.getOutboundStore();
+
       createManagementAddress();
       createManagementQueue();
       createManagementConsumer();
@@ -75,12 +83,12 @@ public class MQTTPublishManager {
    }
 
    private void createManagementAddress() {
-      String clientId = session.getSessionState().getClientId();
-      managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + clientId);
+      managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX +  state.getClientId());
    }
 
    private void createManagementQueue() throws Exception {
-      if (session.getServer().locateQueue(managementAddress) == null) {
+      Queue q = session.getServer().locateQueue(managementAddress);
+      if (q == null) {
          session.getServerSession().createQueue(managementAddress, managementAddress, null, false, MQTTUtil.DURABLE_MESSAGES);
       }
    }
@@ -89,10 +97,6 @@ public class MQTTPublishManager {
       return consumer == managementConsumer;
    }
 
-   private int generateMqttId(int qos) {
-      return session.getSessionState().generateId();
-   }
-
    /**
     * Since MQTT Subscriptions can over lap; a client may receive the same message twice.  When this happens the client
     * returns a PubRec or PubAck with ID.  But we need to know which consumer to ack, since we only have the ID to go on we
@@ -110,10 +114,8 @@ public class MQTTPublishManager {
             sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
             session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
          } else {
-            String consumerAddress = consumer.getQueue().getAddress().toString();
-            Integer mqttid = generateMqttId(qos);
-
-            session.getSessionState().addOutbandMessageRef(mqttid, consumerAddress, message.getMessageID(), qos);
+            int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
+            outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
             sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
          }
       }
@@ -128,9 +130,9 @@ public class MQTTPublishManager {
             serverMessage.setDurable(MQTTUtil.DURABLE_MESSAGES);
          }
 
-         if (qos < 2 || !session.getSessionState().getPubRec().contains(messageId)) {
+         if (qos < 2 || !state.getPubRec().contains(messageId)) {
             if (qos == 2)
-               session.getSessionState().getPubRec().add(messageId);
+               state.getPubRec().add(messageId);
             session.getServerSession().send(serverMessage, true);
          }
 
@@ -144,11 +146,29 @@ public class MQTTPublishManager {
    }
 
    void sendPubRelMessage(ServerMessage message) {
-      if (message.getIntProperty(MQTTUtil.MQTT_MESSAGE_TYPE_KEY) == MqttMessageType.PUBREL.value()) {
-         int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
-         MQTTMessageInfo messageInfo = new MQTTMessageInfo(message.getMessageID(), managementConsumer.getID(), message.getAddress().toString());
-         session.getSessionState().storeMessageRef(messageId, messageInfo, false);
-         session.getProtocolHandler().sendPubRel(messageId);
+      int messageId = message.getIntProperty(MQTTUtil.MQTT_MESSAGE_ID_KEY);
+      session.getProtocolHandler().sendPubRel(messageId);
+   }
+
+   void handlePubRec(int messageId) throws Exception {
+      try {
+         Pair<Long, Long> ref = outboundStore.publishReceived(messageId);
+         if (ref != null) {
+            ServerMessage m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
+            session.getServerSession().send(m, true);
+            session.getServerSession().acknowledge(ref.getB(), ref.getA());
+         } else {
+            session.getProtocolHandler().sendPubRel(messageId);
+         }
+      } catch (ActiveMQIllegalStateException e) {
+         log.warn("MQTT Client(" + session.getSessionState().getClientId() + ") attempted to Ack already Ack'd message");
+      }
+   }
+
+   void handlePubComp(int messageId) throws Exception {
+      Pair<Long, Long> ref = session.getState().getOutboundStore().publishComplete(messageId);
+      if (ref != null) {
+         session.getServerSession().acknowledge(ref.getB(), ref.getA());
       }
    }
 
@@ -170,38 +190,21 @@ public class MQTTPublishManager {
       });
    }
 
-   void handlePubRec(int messageId) throws Exception {
-      MQTTMessageInfo messageRef = session.getSessionState().getMessageInfo(messageId);
-      if (messageRef != null) {
-         ServerMessage pubRel = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
-         session.getServerSession().send(pubRel, true);
-         session.getServerSession().acknowledge(messageRef.getConsumerId(), messageRef.getServerMessageId());
-         session.getProtocolHandler().sendPubRel(messageId);
-      }
-   }
-
-   void handlePubComp(int messageId) throws Exception {
-      MQTTMessageInfo messageInfo = session.getSessionState().getMessageInfo(messageId);
-
-      // Check to see if this message is stored if not just drop the packet.
-      if (messageInfo != null) {
-         session.getServerSession().acknowledge(managementConsumer.getID(), messageInfo.getServerMessageId());
-      }
-   }
-
    void handlePubRel(int messageId) {
       // We don't check to see if a PubRel existed for this message.  We assume it did and so send PubComp.
-      session.getSessionState().getPubRec().remove(messageId);
+      state.getPubRec().remove(messageId);
       session.getProtocolHandler().sendPubComp(messageId);
-      session.getSessionState().removeMessageRef(messageId);
+      state.removeMessageRef(messageId);
    }
 
    void handlePubAck(int messageId) throws Exception {
-      Pair<String, Long> pub1MessageInfo = session.getSessionState().removeOutbandMessageRef(messageId, 1);
-      if (pub1MessageInfo != null) {
-         String mqttAddress = MQTTUtil.convertCoreAddressFilterToMQTT(pub1MessageInfo.getA());
-         ServerConsumer consumer = session.getSubscriptionManager().getConsumerForAddress(mqttAddress);
-         session.getServerSession().acknowledge(consumer.getID(), pub1MessageInfo.getB());
+      try {
+         Pair<Long, Long> ref = outboundStore.publishAckd(messageId);
+         if (ref != null) {
+            session.getServerSession().acknowledge(ref.getB(), ref.getA());
+         }
+      } catch (ActiveMQIllegalStateException e) {
+         log.warn("MQTT Client(" + session.getSessionState().getClientId() + ") attempted to Ack already Ack'd message");
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0934869/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index c48f6aa..008bcd8 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -66,9 +66,8 @@ public class MQTTRetainMessageManager {
       }
    }
 
-   void addRetainedMessagesToQueue(SimpleString queueName, String address) throws Exception {
+   void addRetainedMessagesToQueue(Queue queue, String address) throws Exception {
       // Queue to add the retained messages to
-      Queue queue = session.getServer().locateQueue(queueName);
 
       // The address filter that matches all retained message queues.
       String retainAddress = MQTTUtil.convertMQTTAddressFilterToCoreRetain(address);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0934869/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
index dd7a360..194fe5e 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSessionState.java
@@ -18,6 +18,7 @@
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
@@ -48,8 +49,6 @@ public class MQTTSessionState {
 
    private boolean attached = false;
 
-   private MQTTLogger log = MQTTLogger.LOGGER;
-
    // Objects track the Outbound message references
    private Map<Integer, Pair<String, Long>> outboundMessageReferenceStore;
 
@@ -60,6 +59,8 @@ public class MQTTSessionState {
    // FIXME We should use a better mechanism for creating packet IDs.
    private AtomicInteger lastId = new AtomicInteger(0);
 
+   private final OutboundStore outboundStore = new OutboundStore();
+
    public MQTTSessionState(String clientId) {
       this.clientId = clientId;
 
@@ -73,53 +74,14 @@ public class MQTTSessionState {
       addressMessageMap = new ConcurrentHashMap<>();
    }
 
-   int generateId() {
-      lastId.compareAndSet(Short.MAX_VALUE, 1);
-      return lastId.addAndGet(1);
-   }
-
-   void addOutbandMessageRef(int mqttId, String address, long serverMessageId, int qos) {
-      synchronized (outboundLock) {
-         outboundMessageReferenceStore.put(mqttId, new Pair<>(address, serverMessageId));
-         if (qos == 2) {
-            if (reverseOutboundReferenceStore.containsKey(address)) {
-               reverseOutboundReferenceStore.get(address).put(serverMessageId, mqttId);
-            } else {
-               ConcurrentHashMap<Long, Integer> serverToMqttId = new ConcurrentHashMap<>();
-               serverToMqttId.put(serverMessageId, mqttId);
-               reverseOutboundReferenceStore.put(address, serverToMqttId);
-            }
-         }
-      }
-   }
-
-   Pair<String, Long> removeOutbandMessageRef(int mqttId, int qos) {
-      synchronized (outboundLock) {
-         Pair<String, Long> messageInfo = outboundMessageReferenceStore.remove(mqttId);
-         if (qos == 1) {
-            return messageInfo;
-         }
-
-         Map<Long, Integer> map = reverseOutboundReferenceStore.get(messageInfo.getA());
-         if (map != null) {
-            map.remove(messageInfo.getB());
-            if (map.isEmpty()) {
-               reverseOutboundReferenceStore.remove(messageInfo.getA());
-            }
-            return messageInfo;
-         }
-         return null;
-      }
+   OutboundStore getOutboundStore() {
+      return outboundStore;
    }
 
    Set<Integer> getPubRec() {
       return pubRec;
    }
 
-   Set<Integer> getPub() {
-      return pub;
-   }
-
    boolean getAttached() {
       return attached;
    }
@@ -185,16 +147,6 @@ public class MQTTSessionState {
       this.clientId = clientId;
    }
 
-   void storeMessageRef(Integer mqttId, MQTTMessageInfo messageInfo, boolean storeAddress) {
-      messageRefStore.put(mqttId, messageInfo);
-      if (storeAddress) {
-         Map<Long, Integer> addressMap = addressMessageMap.get(messageInfo.getAddress());
-         if (addressMap != null) {
-            addressMap.put(messageInfo.getServerMessageId(), mqttId);
-         }
-      }
-   }
-
    void removeMessageRef(Integer mqttId) {
       MQTTMessageInfo info = messageRefStore.remove(mqttId);
       if (info != null) {
@@ -205,7 +157,50 @@ public class MQTTSessionState {
       }
    }
 
-   MQTTMessageInfo getMessageInfo(Integer mqttId) {
-      return messageRefStore.get(mqttId);
+   public class OutboundStore {
+
+      private final HashMap<String, Integer> artemisToMqttMessageMap = new HashMap<>();
+
+      private final HashMap<Integer, Pair<Long, Long>> mqttToServerIds = new HashMap<>();
+
+      private final Object dataStoreLock = new Object();
+
+      private final AtomicInteger ids = new AtomicInteger(0);
+
+      public int generateMqttId(long serverId, long consumerId) {
+         synchronized (dataStoreLock) {
+            Integer id = artemisToMqttMessageMap.get(consumerId + ":" + serverId);
+            if (id == null) {
+               ids.compareAndSet(Short.MAX_VALUE, 1);
+               id = ids.addAndGet(1);
+            }
+            return id;
+         }
+      }
+
+      public void publish(int mqtt, long serverId, long consumerId) {
+         synchronized (dataStoreLock) {
+            artemisToMqttMessageMap.put(consumerId + ":" + serverId, mqtt);
+            mqttToServerIds.put(mqtt, new Pair(serverId, consumerId));
+         }
+      }
+
+      public Pair<Long, Long> publishAckd(int mqtt) {
+         synchronized (dataStoreLock) {
+            Pair p =  mqttToServerIds.remove(mqtt);
+            if (p != null) {
+               mqttToServerIds.remove(p.getA());
+            }
+            return p;
+         }
+      }
+
+      public Pair<Long, Long> publishReceived(int mqtt) {
+         return publishAckd(mqtt);
+      }
+
+      public Pair<Long, Long> publishComplete(int mqtt) {
+         return publishAckd(mqtt);
+      }
    }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0934869/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index ea3ab19..d894910 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -36,8 +36,6 @@ public class MQTTSubscriptionManager {
 
    private ConcurrentMap<String, ServerConsumer> consumers;
 
-   private MQTTLogger log = MQTTLogger.LOGGER;
-
    // We filter out Artemis management messages and notifications
    private SimpleString managementFilter;
 
@@ -63,7 +61,7 @@ public class MQTTSubscriptionManager {
 
    synchronized void start() throws Exception {
       for (MqttTopicSubscription subscription : session.getSessionState().getSubscriptions()) {
-         SimpleString q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value());
+         Queue q = createQueueForSubscription(subscription.topicName(), subscription.qualityOfService().value());
          createConsumerForSubscriptionQueue(q, subscription.topicName(), subscription.qualityOfService().value());
       }
    }
@@ -86,23 +84,23 @@ public class MQTTSubscriptionManager {
    /**
     * Creates a Queue if it doesn't already exist, based on a topic and address.  Returning the queue name.
     */
-   private SimpleString createQueueForSubscription(String topic, int qos) throws Exception {
+   private Queue createQueueForSubscription(String topic, int qos) throws Exception {
       String address = MQTTUtil.convertMQTTAddressFilterToCore(topic);
       SimpleString queue = getQueueNameForTopic(address);
 
       Queue q = session.getServer().locateQueue(queue);
       if (q == null) {
-         session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0);
+         q = session.getServerSession().createQueue(new SimpleString(address), queue, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0);
       }
-      return queue;
+      return q;
    }
 
    /**
     * Creates a new consumer for the queue associated with a subscription
     */
-   private void createConsumerForSubscriptionQueue(SimpleString queue, String topic, int qos) throws Exception {
+   private void createConsumerForSubscriptionQueue(Queue queue, String topic, int qos) throws Exception {
       long cid = session.getServer().getStorageManager().generateID();
-      ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue, null, false, true, -1);
+      ServerConsumer consumer = session.getServerSession().createConsumer(cid, queue.getName(), null, false, true, -1);
       consumer.setStarted(true);
 
       consumers.put(topic, consumer);
@@ -117,7 +115,7 @@ public class MQTTSubscriptionManager {
 
       session.getSessionState().addSubscription(subscription);
 
-      SimpleString q = createQueueForSubscription(topic, qos);
+      Queue q = createQueueForSubscription(topic, qos);
 
       if (s == null) {
          createConsumerForSubscriptionQueue(q, topic, qos);
@@ -171,7 +169,4 @@ public class MQTTSubscriptionManager {
       return consumerQoSLevels;
    }
 
-   ServerConsumer getConsumerForAddress(String address) {
-      return consumers.get(address);
-   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0934869/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index e6affc1..3638431 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -69,6 +69,8 @@ public class MQTTUtil {
       return swapMQTTAndCoreWildCards(filter);
    }
 
+   private static final MQTTLogger logger = MQTTLogger.LOGGER;
+
    public static String convertCoreAddressFilterToMQTT(String filter) {
       if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
          filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
@@ -148,25 +150,38 @@ public class MQTTUtil {
       return message;
    }
 
-   public static void logMessage(MQTTLogger logger, MqttMessage message, boolean inbound) {
-      StringBuilder log = inbound ? new StringBuilder("Received ") : new StringBuilder("Sent ");
+   public static void logMessage(MQTTSessionState state, MqttMessage message, boolean inbound) {
+      if (logger.isTraceEnabled()) {
 
-      if (message.fixedHeader() != null) {
-         log.append(message.fixedHeader().messageType().toString());
+         StringBuilder log = new StringBuilder("MQTT(");
 
-         if (message.variableHeader() instanceof MqttPublishVariableHeader) {
-            log.append("(" + ((MqttPublishVariableHeader) message.variableHeader()).messageId() + ") " + message.fixedHeader().qosLevel());
-         } else if (message.variableHeader() instanceof MqttMessageIdVariableHeader) {
-            log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")");
+         if (state != null) {
+            log.append(state.getClientId());
          }
 
-         if (message.fixedHeader().messageType() == MqttMessageType.SUBSCRIBE) {
-            for (MqttTopicSubscription sub : ((MqttSubscribeMessage) message).payload().topicSubscriptions()) {
-               log.append("\n\t" + sub.topicName() + " : " + sub.qualityOfService());
-            }
+         if (inbound) {
+            log.append("): IN << ");
+         } else {
+            log.append("): OUT >> ");
          }
 
-         logger.debug(log.toString());
+         if (message.fixedHeader() != null) {
+            log.append(message.fixedHeader().messageType().toString());
+
+            if (message.variableHeader() instanceof MqttPublishVariableHeader) {
+               log.append("(" + ((MqttPublishVariableHeader) message.variableHeader()).messageId() + ") " + message.fixedHeader().qosLevel());
+            } else if (message.variableHeader() instanceof MqttMessageIdVariableHeader) {
+               log.append("(" + ((MqttMessageIdVariableHeader) message.variableHeader()).messageId() + ")");
+            }
+
+            if (message.fixedHeader().messageType() == MqttMessageType.SUBSCRIBE) {
+               for (MqttTopicSubscription sub : ((MqttSubscribeMessage) message).payload().topicSubscriptions()) {
+                  log.append("\n\t" + sub.topicName() + " : " + sub.qualityOfService());
+               }
+            }
+
+            logger.trace(log.toString());
+         }
       }
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a0934869/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 7ea7a1e..b809df0 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -267,6 +267,32 @@ public class MQTTTest extends MQTTTestSupport {
       assertEquals(NUM_MESSAGES, MQTTOutoingInterceptor.getMessageCount());
    }
 
+   @Test(timeout = 600 * 1000)
+   public void testSendMoreThanUniqueId() throws Exception {
+      int messages = (Short.MAX_VALUE * 2) + 1;
+
+      final MQTTClientProvider publisher = getMQTTClientProvider();
+      initializeConnection(publisher);
+
+      final MQTTClientProvider subscriber = getMQTTClientProvider();
+      initializeConnection(subscriber);
+
+      int count = 0;
+      subscriber.subscribe("foo", EXACTLY_ONCE);
+      for (int i = 0; i < messages; i++) {
+         String payload = "Test Message: " + i;
+         publisher.publish("foo", payload.getBytes(), EXACTLY_ONCE);
+         byte[] message = subscriber.receive(5000);
+         assertNotNull("Should get a message + [" + i + "]", message);
+         assertEquals(payload, new String(message));
+         count++;
+      }
+
+      assertEquals(messages, count);
+      subscriber.disconnect();
+      publisher.disconnect();
+   }
+
    @Test(timeout = 60 * 1000)
    public void testSendAndReceiveLargeMessages() throws Exception {
       byte[] payload = new byte[1024 * 32];


[2/2] activemq-artemis git commit: This closes #832

Posted by cl...@apache.org.
This closes #832


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/c684e9c8
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/c684e9c8
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/c684e9c8

Branch: refs/heads/master
Commit: c684e9c8a644606f3b9dafad4f2a0e9d48f698c6
Parents: 646c8ce a093486
Author: Clebert Suconic <cl...@apache.org>
Authored: Mon Oct 10 23:24:15 2016 +0200
Committer: Clebert Suconic <cl...@apache.org>
Committed: Mon Oct 10 23:24:15 2016 +0200

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTProtocolHandler.java |  12 ++-
 .../core/protocol/mqtt/MQTTPublishManager.java  |  93 ++++++++--------
 .../protocol/mqtt/MQTTRetainMessageManager.java |   3 +-
 .../core/protocol/mqtt/MQTTSessionState.java    | 105 +++++++++----------
 .../protocol/mqtt/MQTTSubscriptionManager.java  |  19 ++--
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  41 +++++---
 .../integration/mqtt/imported/MQTTTest.java     |  26 +++++
 7 files changed, 169 insertions(+), 130 deletions(-)
----------------------------------------------------------------------