You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/05/25 18:17:42 UTC
[2/3] activemq-artemis git commit: ARTEMIS-233 Remove MQTT Address
PreFix for cross protocol support
ARTEMIS-233 Remove MQTT Address PreFix for cross protocol support
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3c7c2ed5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3c7c2ed5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3c7c2ed5
Branch: refs/heads/master
Commit: 3c7c2ed5d34d705651c6797c52029205c43b301a
Parents: a313558
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Sep 25 14:47:23 2015 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Wed May 25 17:59:00 2016 +0100
----------------------------------------------------------------------
.../artemis/core/protocol/mqtt/MQTTUtil.java | 10 +-
.../integration/mqtt/imported/MQTTTest.java | 172 ++++++++++---------
.../mqtt/imported/MQTTTestSupport.java | 2 +
3 files changed, 95 insertions(+), 89 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c7c2ed5/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index aa2262a..2313248 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
import org.apache.activemq.artemis.core.server.ServerMessage;
@@ -52,8 +53,6 @@ public class MQTTUtil {
public static final int MAX_MESSAGE_SIZE = 268435455;
- public static final String MQTT_ADDRESS_PREFIX = "$sys.mqtt.";
-
public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain.";
public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level";
@@ -67,16 +66,13 @@ public class MQTTUtil {
public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000;
public static String convertMQTTAddressFilterToCore(String filter) {
- return MQTT_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter);
+ return swapMQTTAndCoreWildCards(filter);
}
public static String convertCoreAddressFilterToMQTT(String filter) {
if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
}
- else if (filter.startsWith(MQTT_ADDRESS_PREFIX)) {
- filter = filter.substring(MQTT_ADDRESS_PREFIX.length(), filter.length());
- }
return swapMQTTAndCoreWildCards(filter);
}
@@ -117,6 +113,8 @@ public class MQTTUtil {
message.setAddress(address);
message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
+ // For JMS Consumption
+ message.setType(Message.BYTES_TYPE);
return message;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c7c2ed5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index f15af10..257cf8f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.mqtt.imported;
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
import java.lang.reflect.Field;
import java.net.ProtocolException;
import java.util.ArrayList;
@@ -46,7 +50,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.vertx.java.core.impl.ConcurrentHashSet;
-/**
+/**QT
* MQTT Test imported from ActiveMQ MQTT component.
*/
public class MQTTTest extends MQTTTestSupport {
@@ -1001,91 +1005,93 @@ public class MQTTTest extends MQTTTestSupport {
outstanding task to add cross protocol support. This task should rework these tests. The tests are included here
and commented out to ensure ActiveMQ and Artemis tests are in sync. */
- // @Test(timeout = 60 * 1000)
- // public void testSendMQTTReceiveJMS() throws Exception {
- // doTestSendMQTTReceiveJMS("foo.*");
- // }
+ @Test(timeout = 60 * 1000)
+ public void testSendMQTTReceiveJMS() throws Exception {
+ doTestSendMQTTReceiveJMS("foo.*", "foo/bar");
+ }
- // public void doTestSendMQTTReceiveJMS(String destinationName) throws Exception {
- // final MQTTClientProvider provider = getMQTTClientProvider();
- // initializeConnection(provider);
- //
- // // send retained message
- // final String RETAINED = "RETAINED";
- // provider.publish("foo/bah", RETAINED.getBytes(), AT_LEAST_ONCE, true);
- //
- // ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
- // // MUST set to true to receive retained messages
- // activeMQConnection.setUseRetroactiveConsumer(true);
- // activeMQConnection.start();
- // Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // javax.jms.Topic jmsTopic = s.createTopic(destinationName);
- // MessageConsumer consumer = s.createConsumer(jmsTopic);
- //
- // // check whether we received retained message on JMS subscribe
- // ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
- // assertNotNull("Should get retained message", message);
- // ByteSequence bs = message.getContent();
- // assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
- // assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
- //
- // for (int i = 0; i < NUM_MESSAGES; i++) {
- // String payload = "Test Message: " + i;
- // provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
- // message = (ActiveMQMessage) consumer.receive(5000);
- // assertNotNull("Should get a message", message);
- // bs = message.getContent();
- // assertEquals(payload, new String(bs.data, bs.offset, bs.length));
- // }
- //
- // activeMQConnection.close();
- // provider.disconnect();
- // }
+ public void doTestSendMQTTReceiveJMS(String jmsTopicAddress, String mqttAddress) throws Exception {
+ final MQTTClientProvider provider = getMQTTClientProvider();
+ initializeConnection(provider);
- // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
- // @Test(timeout = 2 * 60 * 1000)
- // public void testSendJMSReceiveMQTT() throws Exception {
- // doTestSendJMSReceiveMQTT("foo.far");
- // }
+ // send retained message
+ final String address = "jms/queue/" + mqttAddress;
+ final String RETAINED = "RETAINED";
- // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
- // public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
- // final MQTTClientProvider provider = getMQTTClientProvider();
- // initializeConnection(provider);
- //
- // ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
- // activeMQConnection.setUseRetroactiveConsumer(true);
- // activeMQConnection.start();
- // Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // javax.jms.Topic jmsTopic = s.createTopic(destinationName);
- // MessageProducer producer = s.createProducer(jmsTopic);
- //
- // // send retained message from JMS
- // final String RETAINED = "RETAINED";
- // TextMessage sendMessage = s.createTextMessage(RETAINED);
- // // mark the message to be retained
- // sendMessage.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
- // // MQTT QoS can be set using MQTTProtocolConverter.QOS_PROPERTY_NAME property
- // sendMessage.setIntProperty(MQTTProtocolConverter.QOS_PROPERTY_NAME, 0);
- // producer.send(sendMessage);
- //
- // provider.subscribe("foo/+", AT_MOST_ONCE);
- // byte[] message = provider.receive(10000);
- // assertNotNull("Should get retained message", message);
- // assertEquals(RETAINED, new String(message));
- //
- // for (int i = 0; i < NUM_MESSAGES; i++) {
- // String payload = "This is Test Message: " + i;
- // sendMessage = s.createTextMessage(payload);
- // producer.send(sendMessage);
- // message = provider.receive(5000);
- // assertNotNull("Should get a message", message);
- //
- // assertEquals(payload, new String(message));
- // }
- // provider.disconnect();
- // activeMQConnection.close();
- // }
+ final byte[] payload = RETAINED.getBytes();
+
+ Connection connection = cf.createConnection();
+ // MUST set to true to receive retained messages
+ connection.start();
+
+ Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ javax.jms.Queue jmsQueue = s.createQueue(jmsTopicAddress);
+ MessageConsumer consumer = s.createConsumer(jmsQueue);
+
+ provider.publish(address, RETAINED.getBytes(), AT_LEAST_ONCE, true);
+
+ // check whether we received retained message on JMS subscribe
+ BytesMessage message = (BytesMessage) consumer.receive(5000);
+ assertNotNull("Should get retained message", message);
+
+ byte[] b = new byte[8];
+ message.readBytes(b);
+ assertArrayEquals(payload, b);
+
+ for (int i = 0; i < NUM_MESSAGES; i++) {
+ String p = "Test Message: " + i;
+ provider.publish(address, p.getBytes(), AT_LEAST_ONCE);
+ message = (BytesMessage) consumer.receive(5000);
+ assertNotNull("Should get a message", message);
+
+ byte[] bytePayload = new byte[p.getBytes().length];
+ message.readBytes(bytePayload);
+ assertArrayEquals(payload, b);
+ }
+
+ connection.close();
+ provider.disconnect();
+ }
+
+// @Test(timeout = 2 * 60 * 1000)
+// public void testSendJMSReceiveMQTT() throws Exception {
+// doTestSendJMSReceiveMQTT("foo.far");
+// }
+//
+// public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
+// final MQTTClientProvider provider = getMQTTClientProvider();
+// initializeConnection(provider);
+//
+// Connection connection = cf.createConnection();
+// connection.start();
+//
+// Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// javax.jms.Queue queue = s.createQueue(destinationName);
+// MessageProducer producer = s.createProducer(queue);
+//
+// // send retained message from JMS
+// final String RETAINED = "RETAINED";
+// TextMessage sendMessage = s.createTextMessage(RETAINED);
+// sendMessage.setIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY, 0);
+// producer.send(sendMessage);
+//
+// provider.subscribe("jms/queue/foo/+", AT_MOST_ONCE);
+// byte[] message = provider.receive(10000);
+// assertNotNull("Should get retained message", message);
+// assertEquals(RETAINED, new String(message));
+//
+// for (int i = 0; i < NUM_MESSAGES; i++) {
+// String payload = "This is Test Message: " + i;
+// sendMessage = s.createTextMessage(payload);
+// producer.send(sendMessage);
+// message = provider.receive(5000);
+// assertNotNull("Should get a message", message);
+//
+// assertEquals(payload, new String(message));
+// }
+// provider.disconnect();
+// connection.close();
+// }
@Test(timeout = 60 * 1000)
public void testPingKeepsInactivityMonitorAlive() throws Exception {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c7c2ed5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index 61fcec0..73489af 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -125,6 +125,8 @@ public class MQTTTestSupport extends ActiveMQTestBase {
addMQTTConnector();
AddressSettings addressSettings = new AddressSettings();
addressSettings.setMaxSizeBytes(999999999);
+ addressSettings.setAutoCreateJmsQueues(true);
+
server.getAddressSettingsRepository().addMatch("#", addressSettings);
server.start();
server.waitForActivation(10, TimeUnit.SECONDS);