You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/05/25 18:17:42 UTC

[2/3] activemq-artemis git commit: ARTEMIS-233 Remove MQTT Address PreFix for cross protocol support

ARTEMIS-233 Remove MQTT Address PreFix for cross protocol support


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/3c7c2ed5
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/3c7c2ed5
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/3c7c2ed5

Branch: refs/heads/master
Commit: 3c7c2ed5d34d705651c6797c52029205c43b301a
Parents: a313558
Author: Martyn Taylor <mt...@redhat.com>
Authored: Fri Sep 25 14:47:23 2015 +0100
Committer: Martyn Taylor <mt...@redhat.com>
Committed: Wed May 25 17:59:00 2016 +0100

----------------------------------------------------------------------
 .../artemis/core/protocol/mqtt/MQTTUtil.java    |  10 +-
 .../integration/mqtt/imported/MQTTTest.java     | 172 ++++++++++---------
 .../mqtt/imported/MQTTTestSupport.java          |   2 +
 3 files changed, 95 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c7c2ed5/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
index aa2262a..2313248 100644
--- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
+++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTUtil.java
@@ -24,6 +24,7 @@ import io.netty.handler.codec.mqtt.MqttMessageType;
 import io.netty.handler.codec.mqtt.MqttPublishVariableHeader;
 import io.netty.handler.codec.mqtt.MqttSubscribeMessage;
 import io.netty.handler.codec.mqtt.MqttTopicSubscription;
+import org.apache.activemq.artemis.api.core.Message;
 import org.apache.activemq.artemis.api.core.SimpleString;
 import org.apache.activemq.artemis.core.buffers.impl.ChannelBufferWrapper;
 import org.apache.activemq.artemis.core.server.ServerMessage;
@@ -52,8 +53,6 @@ public class MQTTUtil {
 
    public static final int MAX_MESSAGE_SIZE = 268435455;
 
-   public static final String MQTT_ADDRESS_PREFIX = "$sys.mqtt.";
-
    public static final String MQTT_RETAIN_ADDRESS_PREFIX = "$sys.mqtt.retain.";
 
    public static final String MQTT_QOS_LEVEL_KEY = "mqtt.qos.level";
@@ -67,16 +66,13 @@ public class MQTTUtil {
    public static final int DEFAULT_KEEP_ALIVE_FREQUENCY = 5000;
 
    public static String convertMQTTAddressFilterToCore(String filter) {
-      return MQTT_ADDRESS_PREFIX + swapMQTTAndCoreWildCards(filter);
+      return swapMQTTAndCoreWildCards(filter);
    }
 
    public static String convertCoreAddressFilterToMQTT(String filter) {
       if (filter.startsWith(MQTT_RETAIN_ADDRESS_PREFIX)) {
          filter = filter.substring(MQTT_RETAIN_ADDRESS_PREFIX.length(), filter.length());
       }
-      else if (filter.startsWith(MQTT_ADDRESS_PREFIX)) {
-         filter = filter.substring(MQTT_ADDRESS_PREFIX.length(), filter.length());
-      }
       return swapMQTTAndCoreWildCards(filter);
    }
 
@@ -117,6 +113,8 @@ public class MQTTUtil {
       message.setAddress(address);
       message.putBooleanProperty(new SimpleString(MQTT_MESSAGE_RETAIN_KEY), retain);
       message.putIntProperty(new SimpleString(MQTT_QOS_LEVEL_KEY), qos);
+      // For JMS Consumption
+      message.setType(Message.BYTES_TYPE);
       return message;
    }
 

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c7c2ed5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
index f15af10..257cf8f 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTest.java
@@ -16,6 +16,10 @@
  */
 package org.apache.activemq.artemis.tests.integration.mqtt.imported;
 
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
 import java.lang.reflect.Field;
 import java.net.ProtocolException;
 import java.util.ArrayList;
@@ -46,7 +50,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.vertx.java.core.impl.ConcurrentHashSet;
 
-/**
+/**QT
  * MQTT Test imported from ActiveMQ MQTT component.
  */
 public class MQTTTest extends MQTTTestSupport {
@@ -1001,91 +1005,93 @@ public class MQTTTest extends MQTTTestSupport {
    outstanding task to add cross protocol support.  This task should rework these tests.  The tests are included here
    and commented out to ensure ActiveMQ and Artemis tests are in sync. */
 
-   //   @Test(timeout = 60 * 1000)
-   //   public void testSendMQTTReceiveJMS() throws Exception {
-   //      doTestSendMQTTReceiveJMS("foo.*");
-   //   }
+   @Test(timeout = 60 * 1000)
+   public void testSendMQTTReceiveJMS() throws Exception {
+      doTestSendMQTTReceiveJMS("foo.*", "foo/bar");
+   }
 
-   //   public void doTestSendMQTTReceiveJMS(String destinationName) throws Exception {
-   //      final MQTTClientProvider provider = getMQTTClientProvider();
-   //      initializeConnection(provider);
-   //
-   //      // send retained message
-   //      final String RETAINED = "RETAINED";
-   //      provider.publish("foo/bah", RETAINED.getBytes(), AT_LEAST_ONCE, true);
-   //
-   //      ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
-   //      // MUST set to true to receive retained messages
-   //      activeMQConnection.setUseRetroactiveConsumer(true);
-   //      activeMQConnection.start();
-   //      Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-   //      javax.jms.Topic jmsTopic = s.createTopic(destinationName);
-   //      MessageConsumer consumer = s.createConsumer(jmsTopic);
-   //
-   //      // check whether we received retained message on JMS subscribe
-   //      ActiveMQMessage message = (ActiveMQMessage) consumer.receive(5000);
-   //      assertNotNull("Should get retained message", message);
-   //      ByteSequence bs = message.getContent();
-   //      assertEquals(RETAINED, new String(bs.data, bs.offset, bs.length));
-   //      assertTrue(message.getBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAINED_PROPERTY));
-   //
-   //      for (int i = 0; i < NUM_MESSAGES; i++) {
-   //         String payload = "Test Message: " + i;
-   //         provider.publish("foo/bah", payload.getBytes(), AT_LEAST_ONCE);
-   //         message = (ActiveMQMessage) consumer.receive(5000);
-   //         assertNotNull("Should get a message", message);
-   //         bs = message.getContent();
-   //         assertEquals(payload, new String(bs.data, bs.offset, bs.length));
-   //      }
-   //
-   //      activeMQConnection.close();
-   //      provider.disconnect();
-   //   }
+   public void doTestSendMQTTReceiveJMS(String jmsTopicAddress, String mqttAddress) throws Exception {
+      final MQTTClientProvider provider = getMQTTClientProvider();
+      initializeConnection(provider);
 
-   // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
-   //   @Test(timeout = 2 * 60 * 1000)
-   //   public void testSendJMSReceiveMQTT() throws Exception {
-   //      doTestSendJMSReceiveMQTT("foo.far");
-   //   }
+      // send retained message
+      final String address = "jms/queue/" + mqttAddress;
+      final String RETAINED = "RETAINED";
 
-   // TODO As with other tests, this should be enabled as part of the cross protocol support with MQTT.
-   //   public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
-   //      final MQTTClientProvider provider = getMQTTClientProvider();
-   //      initializeConnection(provider);
-   //
-   //      ActiveMQConnection activeMQConnection = (ActiveMQConnection) cf.createConnection();
-   //      activeMQConnection.setUseRetroactiveConsumer(true);
-   //      activeMQConnection.start();
-   //      Session s = activeMQConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-   //      javax.jms.Topic jmsTopic = s.createTopic(destinationName);
-   //      MessageProducer producer = s.createProducer(jmsTopic);
-   //
-   //      // send retained message from JMS
-   //      final String RETAINED = "RETAINED";
-   //      TextMessage sendMessage = s.createTextMessage(RETAINED);
-   //      // mark the message to be retained
-   //      sendMessage.setBooleanProperty(RetainedMessageSubscriptionRecoveryPolicy.RETAIN_PROPERTY, true);
-   //      // MQTT QoS can be set using MQTTProtocolConverter.QOS_PROPERTY_NAME property
-   //      sendMessage.setIntProperty(MQTTProtocolConverter.QOS_PROPERTY_NAME, 0);
-   //      producer.send(sendMessage);
-   //
-   //      provider.subscribe("foo/+", AT_MOST_ONCE);
-   //      byte[] message = provider.receive(10000);
-   //      assertNotNull("Should get retained message", message);
-   //      assertEquals(RETAINED, new String(message));
-   //
-   //      for (int i = 0; i < NUM_MESSAGES; i++) {
-   //         String payload = "This is Test Message: " + i;
-   //         sendMessage = s.createTextMessage(payload);
-   //         producer.send(sendMessage);
-   //         message = provider.receive(5000);
-   //         assertNotNull("Should get a message", message);
-   //
-   //         assertEquals(payload, new String(message));
-   //      }
-   //      provider.disconnect();
-   //      activeMQConnection.close();
-   //   }
+      final byte[] payload = RETAINED.getBytes();
+
+      Connection connection = cf.createConnection();
+      // MUST set to true to receive retained messages
+      connection.start();
+
+      Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      javax.jms.Queue jmsQueue = s.createQueue(jmsTopicAddress);
+      MessageConsumer consumer = s.createConsumer(jmsQueue);
+
+      provider.publish(address, RETAINED.getBytes(), AT_LEAST_ONCE, true);
+
+      // check whether we received retained message on JMS subscribe
+      BytesMessage message = (BytesMessage) consumer.receive(5000);
+      assertNotNull("Should get retained message", message);
+
+      byte[] b = new byte[8];
+      message.readBytes(b);
+      assertArrayEquals(payload, b);
+
+      for (int i = 0; i < NUM_MESSAGES; i++) {
+         String p = "Test Message: " + i;
+         provider.publish(address, p.getBytes(), AT_LEAST_ONCE);
+         message = (BytesMessage) consumer.receive(5000);
+         assertNotNull("Should get a message", message);
+
+         byte[] bytePayload = new byte[p.getBytes().length];
+         message.readBytes(bytePayload);
+         assertArrayEquals(payload, b);
+      }
+
+      connection.close();
+      provider.disconnect();
+   }
+
+//   @Test(timeout = 2 * 60 * 1000)
+//   public void testSendJMSReceiveMQTT() throws Exception {
+//      doTestSendJMSReceiveMQTT("foo.far");
+//   }
+//
+//   public void doTestSendJMSReceiveMQTT(String destinationName) throws Exception {
+//      final MQTTClientProvider provider = getMQTTClientProvider();
+//      initializeConnection(provider);
+//
+//      Connection connection = cf.createConnection();
+//      connection.start();
+//
+//      Session s = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+//      javax.jms.Queue queue = s.createQueue(destinationName);
+//      MessageProducer producer = s.createProducer(queue);
+//
+//      // send retained message from JMS
+//      final String RETAINED = "RETAINED";
+//      TextMessage sendMessage = s.createTextMessage(RETAINED);
+//      sendMessage.setIntProperty(MQTTUtil.MQTT_QOS_LEVEL_KEY, 0);
+//      producer.send(sendMessage);
+//
+//      provider.subscribe("jms/queue/foo/+", AT_MOST_ONCE);
+//      byte[] message = provider.receive(10000);
+//      assertNotNull("Should get retained message", message);
+//      assertEquals(RETAINED, new String(message));
+//
+//      for (int i = 0; i < NUM_MESSAGES; i++) {
+//         String payload = "This is Test Message: " + i;
+//         sendMessage = s.createTextMessage(payload);
+//         producer.send(sendMessage);
+//         message = provider.receive(5000);
+//         assertNotNull("Should get a message", message);
+//
+//         assertEquals(payload, new String(message));
+//      }
+//      provider.disconnect();
+//      connection.close();
+//   }
 
    @Test(timeout = 60 * 1000)
    public void testPingKeepsInactivityMonitorAlive() throws Exception {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/3c7c2ed5/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
index 61fcec0..73489af 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/mqtt/imported/MQTTTestSupport.java
@@ -125,6 +125,8 @@ public class MQTTTestSupport extends ActiveMQTestBase {
       addMQTTConnector();
       AddressSettings addressSettings = new AddressSettings();
       addressSettings.setMaxSizeBytes(999999999);
+      addressSettings.setAutoCreateJmsQueues(true);
+
       server.getAddressSettingsRepository().addMatch("#", addressSettings);
       server.start();
       server.waitForActivation(10, TimeUnit.SECONDS);