You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2013/05/18 18:39:35 UTC
svn commit: r1484143 - in /activemq/trunk: activemq-unit-tests/pom.xml
activemq-unit-tests/src/test/java/org/apache/activemq/conversions/
activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java
pom.xml
Author: chirino
Date: Sat May 18 16:39:35 2013
New Revision: 1484143
URL: http://svn.apache.org/r1484143
Log:
Fixes AMQ-4544: Cant send MQTT message to AMQP endpoints
Added:
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/
activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java
Modified:
activemq/trunk/activemq-unit-tests/pom.xml
activemq/trunk/pom.xml
Modified: activemq/trunk/activemq-unit-tests/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/pom.xml?rev=1484143&r1=1484142&r2=1484143&view=diff
==============================================================================
--- activemq/trunk/activemq-unit-tests/pom.xml (original)
+++ activemq/trunk/activemq-unit-tests/pom.xml Sat May 18 16:39:35 2013
@@ -156,6 +156,12 @@
<!-- Testing Dependencies -->
<!-- =============================== -->
<dependency>
+ <groupId>org.apache.qpid</groupId>
+ <artifactId>qpid-amqp-1-0-client-jms</artifactId>
+ <version>${qpid-jms-version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<scope>test</scope>
Added: activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java?rev=1484143&view=auto
==============================================================================
--- activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java (added)
+++ activemq/trunk/activemq-unit-tests/src/test/java/org/apache/activemq/conversions/AmqpAndMqttTest.java Sat May 18 16:39:35 2013
@@ -0,0 +1,104 @@
+package org.apache.activemq.conversions;
+
+import org.apache.activemq.CombinationTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
+import org.apache.qpid.amqp_1_0.jms.impl.TopicImpl;
+import org.fusesource.mqtt.client.BlockingConnection;
+import org.fusesource.mqtt.client.MQTT;
+import org.fusesource.mqtt.client.QoS;
+
+import javax.jms.*;
+import java.io.UnsupportedEncodingException;
+import java.util.Arrays;
+
+/**
+ */
+public class AmqpAndMqttTest extends CombinationTestSupport {
+
+ protected BrokerService broker;
+ private TransportConnector amqpConnector;
+ private TransportConnector mqttConnector;
+
+ @Override
+ protected void setUp() throws Exception {
+ super.setUp();
+ broker = createBroker();
+ broker.start();
+ broker.waitUntilStarted();
+ }
+
+ @Override
+ protected void tearDown() throws Exception {
+ if( broker!=null ) {
+ broker.stop();
+ broker.waitUntilStopped();
+ broker = null;
+ }
+ super.tearDown();
+ }
+
+ protected BrokerService createBroker() throws Exception {
+ BrokerService broker = new BrokerService();
+ broker.setPersistent(false);
+ amqpConnector = broker.addConnector("amqp://0.0.0.0:0");
+ mqttConnector = broker.addConnector("mqtt://0.0.0.0:0");
+ return broker;
+ }
+
+
+ public void testFromMqttToAmqp() throws Exception {
+ Connection amqp = createAmqpConnection();
+ Session session = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(session.createTopic("topic://FOO"));
+
+ final BlockingConnection mqtt = createMQTTConnection().blockingConnection();
+ mqtt.connect();
+ byte[] payload = bytes("Hello World");
+ mqtt.publish("FOO", payload, QoS.AT_LEAST_ONCE, false);
+ mqtt.disconnect();
+
+ Message msg = consumer.receive(1000 * 5);
+ assertNotNull(msg);
+ assertTrue(msg instanceof BytesMessage);
+
+ BytesMessage bmsg = (BytesMessage) msg;
+ byte[] actual = new byte[(int) bmsg.getBodyLength()];
+ bmsg.readBytes(actual);
+ assertTrue(Arrays.equals(actual, payload));
+ amqp.close();
+ }
+
+ private byte[] bytes(String value) {
+ try {
+ return value.getBytes("UTF-8");
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+
+ protected MQTT createMQTTConnection() throws Exception {
+ MQTT mqtt = new MQTT();
+ mqtt.setConnectAttemptsMax(1);
+ mqtt.setReconnectAttemptsMax(0);
+ mqtt.setHost("localhost", mqttConnector.getConnectUri().getPort());
+ return mqtt;
+ }
+
+ public Connection createAmqpConnection() throws Exception {
+ final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", amqpConnector.getConnectUri().getPort(), "admin", "password");
+ final Connection connection = factory.createConnection();
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ exception.printStackTrace();
+ }
+ });
+ connection.start();
+ return connection;
+ }
+
+}
Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=1484143&r1=1484142&r2=1484143&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Sat May 18 16:39:35 2013
@@ -65,7 +65,7 @@
<geronimo-version>1.0</geronimo-version>
<hadoop-version>1.0.0</hadoop-version>
<hawtbuf-version>1.9</hawtbuf-version>
- <hawtdispatch-version>1.17-SNAPSHOT</hawtdispatch-version>
+ <hawtdispatch-version>1.17</hawtdispatch-version>
<howl-version>0.1.8</howl-version>
<hsqldb-version>1.8.0.12</hsqldb-version>
<httpclient-version>4.2.3</httpclient-version>
@@ -94,7 +94,7 @@
<org-apache-derby-version>10.9.1.0</org-apache-derby-version>
<org.osgi.core-version>4.3.1</org.osgi.core-version>
<p2psockets-version>1.1.2</p2psockets-version>
- <qpid-proton-version>0.3.0-fuse-2</qpid-proton-version>
+ <qpid-proton-version>0.3.0-fuse-3</qpid-proton-version>
<qpid-jms-version>0.20</qpid-jms-version>
<regexp-version>1.3</regexp-version>
<rome-version>1.0</rome-version>