You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ma...@apache.org on 2017/03/10 13:14:58 UTC
[2/2] activemq-artemis git commit: ARTEMIS-990 Dont require Perms on
MQTT mngment Q
ARTEMIS-990 Dont require Perms on MQTT mngment Q
(cherry picked from commit b33fea0d7fbc94a43d04ca66a89880442e0f91c5)
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2779ad85
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2779ad85
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2779ad85
Branch: refs/heads/1.x
Commit: 2779ad85539be3292b38a683eadb01837bf1024e
Parents: e0cd9aa
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Mar 10 10:14:19 2017 +0000
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Fri Mar 10 13:13:31 2017 +0000
----------------------------------------------------------------------
.../core/protocol/mqtt/MQTTPublishManager.java | 23 ++++++++++++--------
.../protocol/mqtt/MQTTRetainMessageManager.java | 2 +-
.../integration/mqtt/imported/MQTTTest.java | 4 ++--
3 files changed, 17 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2779ad85/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
index 77b45ab..626916f 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTPublishManager.java
@@ -62,10 +62,6 @@ public class MQTTPublishManager {
synchronized void start() throws Exception {
this.state = session.getSessionState();
this.outboundStore = state.getOutboundStore();
-
- createManagementAddress();
- createManagementQueue();
- createManagementConsumer();
}
synchronized void stop() throws Exception {
@@ -77,7 +73,7 @@ public class MQTTPublishManager {
}
void clean() throws Exception {
- createManagementAddress();
+ SimpleString managementAddress = createManagementAddress();
Queue queue = session.getServer().locateQueue(managementAddress);
if (queue != null) {
queue.deleteQueue();
@@ -90,14 +86,14 @@ public class MQTTPublishManager {
managementConsumer.setStarted(true);
}
- private void createManagementAddress() {
- managementAddress = new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId());
+ private SimpleString createManagementAddress() {
+ return new SimpleString(MANAGEMENT_QUEUE_PREFIX + session.getSessionState().getClientId());
}
private void createManagementQueue() throws Exception {
Queue q = session.getServer().locateQueue(managementAddress);
if (q == null) {
- session.getServerSession().createQueue(managementAddress, managementAddress, null, false, MQTTUtil.DURABLE_MESSAGES);
+ session.getServer().createQueue(managementAddress, managementAddress, null, MQTTUtil.DURABLE_MESSAGES, false);
}
}
@@ -183,11 +179,20 @@ public class MQTTPublishManager {
session.getProtocolHandler().sendPubRel(messageId);
}
+ private SimpleString getManagementAddress() throws Exception {
+ if (managementAddress == null) {
+ managementAddress = createManagementAddress();
+ createManagementQueue();
+ createManagementConsumer();
+ }
+ return managementAddress;
+ }
+
void handlePubRec(int messageId) throws Exception {
try {
Pair<Long, Long> ref = outboundStore.publishReceived(messageId);
if (ref != null) {
- ServerMessage m = MQTTUtil.createPubRelMessage(session, managementAddress, messageId);
+ ServerMessage m = MQTTUtil.createPubRelMessage(session, getManagementAddress(), messageId);
session.getServerSession().send(m, true);
session.getServerSession().acknowledge(ref.getB(), ref.getA());
} else {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2779ad85/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
index 70db040..7acc3b4 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTRetainMessageManager.java
@@ -49,7 +49,7 @@ public class MQTTRetainMessageManager {
Queue queue = session.getServer().locateQueue(retainAddress);
if (queue == null) {
- queue = session.getServerSession().createQueue(retainAddress, retainAddress, null, false, true);
+ queue = session.getServer().createQueue(retainAddress, retainAddress, null, true, false);
}
try (LinkedListIterator<MessageReference> iterator = queue.iterator()) {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2779ad85/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index 7cd1bf1..c211260 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTConnectionManager;
import org.apache.activemq.artemis.core.protocol.mqtt.MQTTSession;
import org.apache.activemq.artemis.tests.integration.mqtt.imported.util.Wait;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.fusesource.mqtt.client.BlockingConnection;
import org.fusesource.mqtt.client.MQTT;
import org.fusesource.mqtt.client.MQTTException;
@@ -53,7 +54,6 @@ import org.junit.Ignore;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import org.vertx.java.core.impl.ConcurrentHashSet;
/**
* QT
@@ -1711,7 +1711,7 @@ public class MQTTTest extends MQTTTestSupport {
connection2.connect();
connection2.subscribe(mqttTopic);
- Message message = connection2.receive();
+ Message message = connection2.receive(5000, TimeUnit.MILLISECONDS);
assertEquals(payload, new String(message.getPayload()));
}