You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/12/01 15:26:41 UTC

[1/3] activemq-artemis git commit: Fix MQTT JMSSend Test

Repository: activemq-artemis
Updated Branches:
  refs/heads/ARTEMIS-780 a73aa0951 -> a33a047d4


Fix MQTT JMSSend Test


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a33a047d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a33a047d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a33a047d

Branch: refs/heads/ARTEMIS-780
Commit: a33a047d479c7c6bf58fa68afcaa22e7a8390524
Parents: 8766305
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Dec 1 12:45:15 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Dec 1 15:26:23 2016 +0000

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTPublishManager.java      | 16 +++++++++++++---
 .../core/protocol/mqtt/MQTTSubscriptionManager.java |  2 +-
 .../tests/integration/mqtt/imported/MQTTTest.java   | 11 ++++-------
 3 files changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a33a047d/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index fb3363f..8218208 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -83,7 +83,7 @@ public class MQTTPublishManager {
    }
 
    private void createManagementAddress() {
-      managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX +  state.getClientId());
+      managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId());
    }
 
    private void createManagementQueue() throws Exception {
@@ -113,10 +113,13 @@ public class MQTTPublishManager {
          if (qos == 0) {
             sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
             session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
-         } else {
+         } else if (qos == 1 || qos == 2) {
             int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
             outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
             sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
+         } else {
+            // Client must have disconnected and it's Subscription QoS cleared
+            consumer.individualCancel(message.getMessageID(), false);
          }
       }
    }
@@ -232,7 +235,14 @@ public class MQTTPublishManager {
    }
 
    private int decideQoS(ServerMessage message, ServerConsumer consumer) {
-      int subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
+
+      int subscriptionQoS = -1;
+      try {
+         subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
+      } catch (NullPointerException e) {
+         // This can happen if the client disconnected during a server send.
+         return subscriptionQoS;
+      }
 
       int qos = 2;
       if (message.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a33a047d/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index b3542d3..c4b8b94 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -94,7 +94,7 @@ public class MQTTSubscriptionManager {
 
       Queue q = session.getServer().locateQueue(queue);
       if (q == null) {
-         q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, true);
+         q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false);
       } else {
          if (q.isDeleteOnNoConsumers()) {
             throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true);

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a33a047d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index c342853..58d75d8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -1096,8 +1096,8 @@ public class MQTTTest extends MQTTTestSupport {
       connection.start();
 
       Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-      javax.jms.Queue queue = s.createQueue(destinationName);
-      MessageProducer producer = s.createProducer(queue);
+      javax.jms.Topic topic = s.createTopic(destinationName);
+      MessageProducer producer = s.createProducer(topic);
 
       // send retained message from JMS
       final byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
@@ -1626,10 +1626,7 @@ public class MQTTTest extends MQTTTestSupport {
          SimpleString coreAddress = new SimpleString("foo.bar");
          Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
 
-         AddressInfo addressInfo = new AddressInfo(coreAddress);
-         getServer().createOrUpdateAddressInfo(addressInfo);
-
-         getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, false);
+         getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, true);
 
          MQTT mqtt = createMQTTConnection();
          mqtt.setClientId(clientId);
@@ -1675,7 +1672,7 @@ public class MQTTTest extends MQTTTestSupport {
       try {
          String clientId = "testMqtt";
          SimpleString coreAddress = new SimpleString("foo.bar");
-         getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, false);
+         getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, true);
 
          Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
 


[2/3] activemq-artemis git commit: add missing import

Posted by ma...@apache.org.
add missing import


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/87663057
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/87663057
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/87663057

Branch: refs/heads/ARTEMIS-780
Commit: 876630575de81db34a80ff510c781c96e59e700c
Parents: 5fa3f45
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Dec 1 12:45:22 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Dec 1 15:26:23 2016 +0000

----------------------------------------------------------------------
 .../org/apache/activemq/artemis/jms/client/ActiveMQMessage.java     | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/87663057/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index e558197..4f0be81 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
 import org.apache.activemq.artemis.api.core.client.ClientSession;
 import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
 import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.server.RoutingType;
 import org.apache.activemq.artemis.reader.MessageUtil;
 import org.apache.activemq.artemis.utils.UUID;
 


[3/3] activemq-artemis git commit: Revert "Don't set routingType header on JMS producer as this breaks backwards compatibility"

Posted by ma...@apache.org.
Revert "Don't set routingType header on JMS producer as this breaks backwards compatibility"

This reverts commit 8338080a9be1b3ee317a766423468b39bf6bf64b.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5fa3f459
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5fa3f459
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5fa3f459

Branch: refs/heads/ARTEMIS-780
Commit: 5fa3f45982fc5410b0519884bfea35beff66ecf9
Parents: a73aa09
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Dec 1 11:33:38 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Dec 1 15:26:23 2016 +0000

----------------------------------------------------------------------
 .../activemq/artemis/reader/MessageUtil.java    | 14 ++++++++++---
 .../artemis/jms/client/ActiveMQMessage.java     | 21 +++++++++++---------
 .../jms/client/ActiveMQMessageConsumer.java     |  7 -------
 .../jms/client/ActiveMQMessageProducer.java     |  3 +++
 4 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fa3f459/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
index 507a454..9d37cd3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
@@ -120,11 +120,19 @@ public class MessageUtil {
    }
 
    public static void clearProperties(Message message) {
+      /**
+       * JavaDoc for this method states:
+       *    Clears a message's properties.
+       *    The message's header fields and body are not cleared.
+       *
+       * Since the {@code Message.HDR_ROUTING_TYPE} is used for the JMSDestination header it isn't cleared
+       */
+
       List<SimpleString> toRemove = new ArrayList<>();
 
       for (SimpleString propName : message.getPropertyNames()) {
-         if (!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
-            propName.startsWith(JMS_)) {
+         if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
+            propName.startsWith(JMS_)) && !propName.equals(Message.HDR_ROUTING_TYPE)) {
             toRemove.add(propName);
          }
       }
@@ -139,7 +147,7 @@ public class MessageUtil {
 
       for (SimpleString propName : message.getPropertyNames()) {
          if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
-            propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME)) {
+            propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE)) {
             set.add(propName.toString());
          }
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fa3f459/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index d30fd1b..e558197 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -201,11 +201,9 @@ public class ActiveMQMessage implements javax.jms.Message {
 
    private long jmsDeliveryTime;
 
-   private boolean fromQueue;
-
    // Constructors --------------------------------------------------
 
-   /*g
+   /*
     * Create a new message prior to sending
     */
    protected ActiveMQMessage(final byte type, final ClientSession session) {
@@ -399,8 +397,17 @@ public class ActiveMQMessage implements javax.jms.Message {
    public Destination getJMSDestination() throws JMSException {
       if (dest == null) {
          SimpleString address = message.getAddress();
+         String prefix = "";
+         if (message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)) {
+            RoutingType routingType = RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE));
+            if (routingType.equals(RoutingType.ANYCAST)) {
+               prefix = QUEUE_QUALIFIED_PREFIX;
+            } else if (routingType.equals(RoutingType.MULTICAST)) {
+               prefix = TOPIC_QUALIFIED_PREFIX;
+            }
+         }
 
-         dest = address == null ? null : ActiveMQDestination.fromPrefixedName((fromQueue ? QUEUE_QUALIFIED_PREFIX : TOPIC_QUALIFIED_PREFIX) + address.toString());
+         dest = address == null ? null : ActiveMQDestination.fromPrefixedName(prefix + address.toString());
       }
 
       return dest;
@@ -777,11 +784,7 @@ public class ActiveMQMessage implements javax.jms.Message {
       return message.getBodySize() == 0;
    }
 
-   // Public ---------------------------------------------------------
-
-   public void setFromQueue(boolean fromQueue) {
-      this.fromQueue = fromQueue;
-   }
+   // Public --------------------------------------------------------
 
    public void setIndividualAcknowledge() {
       this.individualAck = true;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fa3f459/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
index ec362fa..8bc1fd8 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
@@ -60,8 +60,6 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
 
    private final ActiveMQDestination destination;
 
-   private final boolean destinationIsQueue;
-
    private final String selector;
 
    private final SimpleString autoDeleteQueueName;
@@ -90,8 +88,6 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
 
       this.destination = destination;
 
-      this.destinationIsQueue = destination instanceof ActiveMQQueue;
-
       this.selector = selector;
 
       this.autoDeleteQueueName = autoDeleteQueueName;
@@ -244,9 +240,6 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
             } else {
                coreMessage.acknowledge();
             }
-
-            // TODO find a more elegant way to do this
-            jmsMsg.setFromQueue(destinationIsQueue);
          }
 
          return jmsMsg;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fa3f459/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index 47d9ff2..4c1d335 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -491,6 +491,9 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
       ClientMessage coreMessage = activeMQJmsMessage.getCoreMessage();
       coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID);
 
+      byte routingType = destination.isQueue() ? RoutingType.ANYCAST.getType() : RoutingType.MULTICAST.getType();
+      coreMessage.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, routingType);
+
       try {
          /**
           * Using a completionListener requires wrapping using a {@link CompletionListenerWrapper},