You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/18 18:39:35 UTC

svn commit: r1484143 - in /activemq/trunk: activemq-unit-tests/pom.xml activemq-unit-tests/src/test/java/org/apache/activemq/conversions/ activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java pom.xml

Author: chirino
Date: Sat May 18 16:39:35 2013
New Revision: 1484143

URL: http://svn.apache.org/r1484143
Log:
Fixes AMQ-4544: Cant send MQTT message to AMQP endpoints

Added:
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/
    activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java
Modified:
    activemq/trunk/activemq-unit-tests/pom.xml
    activemq/trunk/pom.xml

Modified: activemq/trunk/activemq-unit-tests/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/pom.xml?rev=1484143&r1=1484142&r2=1484143&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/pom.xml (original)
+++ activemq/trunk/activemq-unit-tests/pom.xml Sat May 18 16:39:35 2013
@@ -156,6 +156,12 @@
     <!-- Testing Dependencies            -->
     <!-- =============================== -->
     <dependency>
+      <groupId>org.apache.qpid</groupId>
+      <artifactId>qpid-amqp-1-0-client-jms</artifactId>
+      <version>${qpid-jms-version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
       <groupId>org.apache.xbean</groupId>
       <artifactId>xbean-spring</artifactId>
       <scope>test</scope>

Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java?rev=1484143&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java Sat May 18 16:39:35 2013
@@ -0,0 +1,104 @@
+package org.apache.activemq.conversions;
+
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+
+import javax.jms.*;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+/**
+ */
+public class AmqpAndMqttTest extends CombinationTestSupport {
+
+    protected BrokerService broker;
+    private TransportConnector amqpConnector;
+    private TransportConnector mqttConnector;
+
+    @Override
+    protected void setUp() throws Exception {
+        super.setUp();
+        broker = createBroker();
+        broker.start();
+        broker.waitUntilStarted();
+    }
+
+    @Override
+    protected void tearDown() throws Exception {
+        if( broker!=null ) {
+            broker.stop();
+            broker.waitUntilStopped();
+            broker = null;
+        }
+        super.tearDown();
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        amqpConnector = broker.addConnector("amqp://0.0.0.0:0");
+        mqttConnector = broker.addConnector("mqtt://0.0.0.0:0");
+        return broker;
+    }
+
+
+    public void testFromMqttToAmqp() throws Exception {
+        Connection amqp = createAmqpConnection();
+        Session session = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        MessageConsumer consumer = session.createConsumer(session.createTopic("topic://FOO"));
+
+        final BlockingConnection mqtt = createMQTTConnection().blockingConnection();
+        mqtt.connect();
+        byte[] payload = bytes("Hello World");
+        mqtt.publish("FOO", payload, QoS.AT_LEAST_ONCE, false);
+        mqtt.disconnect();
+
+        Message msg = consumer.receive(1000 * 5);
+        assertNotNull(msg);
+        assertTrue(msg instanceof BytesMessage);
+
+        BytesMessage bmsg = (BytesMessage) msg;
+        byte[] actual = new byte[(int) bmsg.getBodyLength()];
+        bmsg.readBytes(actual);
+        assertTrue(Arrays.equals(actual, payload));
+        amqp.close();
+    }
+
+    private byte[] bytes(String value) {
+        try {
+            return value.getBytes("UTF-8");
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+
+    protected MQTT createMQTTConnection() throws Exception {
+        MQTT mqtt = new MQTT();
+        mqtt.setConnectAttemptsMax(1);
+        mqtt.setReconnectAttemptsMax(0);
+        mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
+        return mqtt;
+    }
+
+    public Connection createAmqpConnection() throws Exception {
+        final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", amqpConnector.getConnectUri().getPort(), "admin", "password");
+        final Connection connection = factory.createConnection();
+        connection.setExceptionListener(new ExceptionListener() {
+            @Override
+            public void onException(JMSException exception) {
+                exception.printStackTrace();
+            }
+        });
+        connection.start();
+        return connection;
+    }
+
+}

Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=1484143&r1=1484142&r2=1484143&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Sat May 18 16:39:35 2013
@@ -65,7 +65,7 @@
     <geronimo-version>1.0</geronimo-version>
     <hadoop-version>1.0.0</hadoop-version>
     <hawtbuf-version>1.9</hawtbuf-version>
-    <hawtdispatch-version>1.17-SNAPSHOT</hawtdispatch-version>
+    <hawtdispatch-version>1.17</hawtdispatch-version>
     <howl-version>0.1.8</howl-version>
     <hsqldb-version>1.8.0.12</hsqldb-version>
     <httpclient-version>4.2.3</httpclient-version>
@@ -94,7 +94,7 @@
     <org-apache-derby-version>10.9.1.0</org-apache-derby-version>
     <org.osgi.core-version>4.3.1</org.osgi.core-version>
     <p2psockets-version>1.1.2</p2psockets-version>
-    <qpid-proton-version>0.3.0-fuse-2</qpid-proton-version>
+    <qpid-proton-version>0.3.0-fuse-3</qpid-proton-version>
     <qpid-jms-version>0.20</qpid-jms-version>
     <regexp-version>1.3</regexp-version>
     <rome-version>1.0</rome-version>