You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2016/12/01 15:26:41 UTC
[1/3] activemq-artemis git commit: Fix MQTT JMSSend Test
Repository: activemq-artemis
Updated Branches:
refs/heads/ARTEMIS-780 a73aa0951 -> a33a047d4
Fix MQTT JMSSend Test
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/a33a047d
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/a33a047d
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/a33a047d
Branch: refs/heads/ARTEMIS-780
Commit: a33a047d479c7c6bf58fa68afcaa22e7a8390524
Parents: 8766305
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Dec 1 12:45:15 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Dec 1 15:26:23 2016 +0000
----------------------------------------------------------------------
.../core/protocol/mqtt/MQTTPublishManager.java | 16 +++++++++++++---
.../core/protocol/mqtt/MQTTSubscriptionManager.java | 2 +-
.../tests/integration/mqtt/imported/MQTTTest.java | 11 ++++-------
3 files changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a33a047d/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index fb3363f..8218208 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -83,7 +83,7 @@ public class MQTTPublishManager {
}
private void createManagementAddress() {
- managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId());
+ managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + state.getClientId());
}
private void createManagementQueue() throws Exception {
@@ -113,10 +113,13 @@ public class MQTTPublishManager {
if (qos == 0) {
sendServerMessage((int) message.getMessageID(), (ServerMessageImpl) message, deliveryCount, qos);
session.getServerSession().acknowledge(consumer.getID(), message.getMessageID());
- } else {
+ } else if (qos == 1 || qos == 2) {
int mqttid = outboundStore.generateMqttId(message.getMessageID(), consumer.getID());
outboundStore.publish(mqttid, message.getMessageID(), consumer.getID());
sendServerMessage(mqttid, (ServerMessageImpl) message, deliveryCount, qos);
+ } else {
+ // Client must have disconnected and it's Subscription QoS cleared
+ consumer.individualCancel(message.getMessageID(), false);
}
}
}
@@ -232,7 +235,14 @@ public class MQTTPublishManager {
}
private int decideQoS(ServerMessage message, ServerConsumer consumer) {
- int subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
+
+ int subscriptionQoS = -1;
+ try {
+ subscriptionQoS = session.getSubscriptionManager().getConsumerQoSLevels().get(consumer.getID());
+ } catch (NullPointerException e) {
+ // This can happen if the client disconnected during a server send.
+ return subscriptionQoS;
+ }
int qos = 2;
if (message.containsProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY)) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a33a047d/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
index b3542d3..c4b8b94 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTSubscriptionManager.java
@@ -94,7 +94,7 @@ public class MQTTSubscriptionManager {
Queue q = session.getServer().locateQueue(queue);
if (q == null) {
- q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, true);
+ q = session.getServerSession().createQueue(new SimpleString(address), queue, RoutingType.MULTICAST, managementFilter, false, MQTTUtil.DURABLE_MESSAGES && qos >= 0, false);
} else {
if (q.isDeleteOnNoConsumers()) {
throw ActiveMQMessageBundle.BUNDLE.invalidQueueConfiguration(q.getAddress(), q.getName(), "deleteOnNoConsumers", false, true);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/a33a047d/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index c342853..58d75d8 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -1096,8 +1096,8 @@ public class MQTTTest extends MQTTTestSupport {
connection.start();
Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- javax.jms.Queue queue = s.createQueue(destinationName);
- MessageProducer producer = s.createProducer(queue);
+ javax.jms.Topic topic = s.createTopic(destinationName);
+ MessageProducer producer = s.createProducer(topic);
// send retained message from JMS
final byte[] bytes = new byte[]{1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
@@ -1626,10 +1626,7 @@ public class MQTTTest extends MQTTTestSupport {
SimpleString coreAddress = new SimpleString("foo.bar");
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
- AddressInfo addressInfo = new AddressInfo(coreAddress);
- getServer().createOrUpdateAddressInfo(addressInfo);
-
- getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, false);
+ getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, 0, false, true);
MQTT mqtt = createMQTTConnection();
mqtt.setClientId(clientId);
@@ -1675,7 +1672,7 @@ public class MQTTTest extends MQTTTestSupport {
try {
String clientId = "testMqtt";
SimpleString coreAddress = new SimpleString("foo.bar");
- getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, false);
+ getServer().createQueue(coreAddress, RoutingType.MULTICAST, new SimpleString(clientId + "." + coreAddress), null, false, true, Queue.MAX_CONSUMERS_UNLIMITED, true, true);
Topic[] mqttSubscription = new Topic[]{new Topic("foo/bar", QoS.AT_LEAST_ONCE)};
[2/3] activemq-artemis git commit: add missing import
Posted by ma...@apache.org.
add missing import
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/87663057
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/87663057
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/87663057
Branch: refs/heads/ARTEMIS-780
Commit: 876630575de81db34a80ff510c781c96e59e700c
Parents: 5fa3f45
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Dec 1 12:45:22 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Dec 1 15:26:23 2016 +0000
----------------------------------------------------------------------
.../org/apache/activemq/artemis/jms/client/ActiveMQMessage.java | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/87663057/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index e558197..4f0be81 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -44,6 +44,7 @@ import org.apache.activemq.artemis.api.core.client.ClientMessage;
import org.apache.activemq.artemis.api.core.client.ClientSession;
import org.apache.activemq.artemis.api.jms.ActiveMQJMSConstants;
import org.apache.activemq.artemis.core.message.impl.MessageInternal;
+import org.apache.activemq.artemis.core.server.RoutingType;
import org.apache.activemq.artemis.reader.MessageUtil;
import org.apache.activemq.artemis.utils.UUID;
[3/3] activemq-artemis git commit: Revert "Don't set routingType
header on JMS producer as this breaks backwards compatibility"
Posted by ma...@apache.org.
Revert "Don't set routingType header on JMS producer as this breaks backwards compatibility"
This reverts commit 8338080a9be1b3ee317a766423468b39bf6bf64b.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5fa3f459
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5fa3f459
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5fa3f459
Branch: refs/heads/ARTEMIS-780
Commit: 5fa3f45982fc5410b0519884bfea35beff66ecf9
Parents: a73aa09
Author: Martyn Taylor <mt...@redhat.com>
Authored: Thu Dec 1 11:33:38 2016 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Thu Dec 1 15:26:23 2016 +0000
----------------------------------------------------------------------
.../activemq/artemis/reader/MessageUtil.java | 14 ++++++++++---
.../artemis/jms/client/ActiveMQMessage.java | 21 +++++++++++---------
.../jms/client/ActiveMQMessageConsumer.java | 7 -------
.../jms/client/ActiveMQMessageProducer.java | 3 +++
4 files changed, 26 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fa3f459/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
----------------------------------------------------------------------
diff --git a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
index 507a454..9d37cd3 100644
--- a/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
+++ b/artemis-core-client/src/main/java/org/apache/activemq/artemis/reader/MessageUtil.java
@@ -120,11 +120,19 @@ public class MessageUtil {
}
public static void clearProperties(Message message) {
+ /**
+ * JavaDoc for this method states:
+ * Clears a message's properties.
+ * The message's header fields and body are not cleared.
+ *
+ * Since the {@code Message.HDR_ROUTING_TYPE} is used for the JMSDestination header it isn't cleared
+ */
+
List<SimpleString> toRemove = new ArrayList<>();
for (SimpleString propName : message.getPropertyNames()) {
- if (!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
- propName.startsWith(JMS_)) {
+ if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
+ propName.startsWith(JMS_)) && !propName.equals(Message.HDR_ROUTING_TYPE)) {
toRemove.add(propName);
}
}
@@ -139,7 +147,7 @@ public class MessageUtil {
for (SimpleString propName : message.getPropertyNames()) {
if ((!propName.startsWith(JMS) || propName.startsWith(JMSX) ||
- propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME)) {
+ propName.startsWith(JMS_)) && !propName.startsWith(CONNECTION_ID_PROPERTY_NAME) && !propName.equals(Message.HDR_ROUTING_TYPE)) {
set.add(propName.toString());
}
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fa3f459/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
index d30fd1b..e558197 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessage.java
@@ -201,11 +201,9 @@ public class ActiveMQMessage implements javax.jms.Message {
private long jmsDeliveryTime;
- private boolean fromQueue;
-
// Constructors --------------------------------------------------
- /*g
+ /*
* Create a new message prior to sending
*/
protected ActiveMQMessage(final byte type, final ClientSession session) {
@@ -399,8 +397,17 @@ public class ActiveMQMessage implements javax.jms.Message {
public Destination getJMSDestination() throws JMSException {
if (dest == null) {
SimpleString address = message.getAddress();
+ String prefix = "";
+ if (message.containsProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE)) {
+ RoutingType routingType = RoutingType.getType(message.getByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE));
+ if (routingType.equals(RoutingType.ANYCAST)) {
+ prefix = QUEUE_QUALIFIED_PREFIX;
+ } else if (routingType.equals(RoutingType.MULTICAST)) {
+ prefix = TOPIC_QUALIFIED_PREFIX;
+ }
+ }
- dest = address == null ? null : ActiveMQDestination.fromPrefixedName((fromQueue ? QUEUE_QUALIFIED_PREFIX : TOPIC_QUALIFIED_PREFIX) + address.toString());
+ dest = address == null ? null : ActiveMQDestination.fromPrefixedName(prefix + address.toString());
}
return dest;
@@ -777,11 +784,7 @@ public class ActiveMQMessage implements javax.jms.Message {
return message.getBodySize() == 0;
}
- // Public ---------------------------------------------------------
-
- public void setFromQueue(boolean fromQueue) {
- this.fromQueue = fromQueue;
- }
+ // Public --------------------------------------------------------
public void setIndividualAcknowledge() {
this.individualAck = true;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fa3f459/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
index ec362fa..8bc1fd8 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageConsumer.java
@@ -60,8 +60,6 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
private final ActiveMQDestination destination;
- private final boolean destinationIsQueue;
-
private final String selector;
private final SimpleString autoDeleteQueueName;
@@ -90,8 +88,6 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
this.destination = destination;
- this.destinationIsQueue = destination instanceof ActiveMQQueue;
-
this.selector = selector;
this.autoDeleteQueueName = autoDeleteQueueName;
@@ -244,9 +240,6 @@ public final class ActiveMQMessageConsumer implements QueueReceiver, TopicSubscr
} else {
coreMessage.acknowledge();
}
-
- // TODO find a more elegant way to do this
- jmsMsg.setFromQueue(destinationIsQueue);
}
return jmsMsg;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5fa3f459/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
----------------------------------------------------------------------
diff --git a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
index 47d9ff2..4c1d335 100644
--- a/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
+++ b/artemis-jms-client/src/main/java/org/apache/activemq/artemis/jms/client/ActiveMQMessageProducer.java
@@ -491,6 +491,9 @@ public class ActiveMQMessageProducer implements MessageProducer, QueueSender, To
ClientMessage coreMessage = activeMQJmsMessage.getCoreMessage();
coreMessage.putStringProperty(ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME, connID);
+ byte routingType = destination.isQueue() ? RoutingType.ANYCAST.getType() : RoutingType.MULTICAST.getType();
+ coreMessage.putByteProperty(org.apache.activemq.artemis.api.core.Message.HDR_ROUTING_TYPE, routingType);
+
try {
/**
* Using a completionListener requires wrapping using a {@link CompletionListenerWrapper},