You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2016/06/21 17:01:25 UTC
activemq git commit: Adds a start of message interoperability test
for OpenWire <-> Qpid JMS
Repository: activemq
Updated Branches:
refs/heads/master 1faa4afa9 -> a95d6fd4e
Adds a start of message interoperability test for OpenWire <-> Qpid JMS
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a95d6fd4
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a95d6fd4
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a95d6fd4
Branch: refs/heads/master
Commit: a95d6fd4ebfc2733379791ea82a54abcfd05d9e9
Parents: 1faa4af
Author: Timothy Bish <ta...@gmail.com>
Authored: Tue Jun 21 13:00:55 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Tue Jun 21 13:01:19 2016 -0400
----------------------------------------------------------------------
.../transport/amqp/JMSInteroperabilityTest.java | 179 +++++++++++++++++++
1 file changed, 179 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/a95d6fd4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
new file mode 100644
index 0000000..17f1702
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSInteroperabilityTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Tests interoperability between OpenWire and AMQP
+ */
+@RunWith(Parameterized.class)
+public class JMSInteroperabilityTest extends JMSClientTestSupport {
+
+ protected static final Logger LOG = LoggerFactory.getLogger(JMSInteroperabilityTest.class);
+
+ private final String transformer;
+
+ @Parameters(name="Transformer->{0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"jms"},
+ {"native"},
+ {"raw"},
+ });
+ }
+
+ public JMSInteroperabilityTest(String transformer) {
+ this.transformer = transformer;
+ }
+
+ @Override
+ protected boolean isUseOpenWireConnector() {
+ return true;
+ }
+
+ @Override
+ protected String getAmqpTransformer() {
+ return transformer;
+ }
+
+ //----- Tests for OpenWire to Qpid JMS using MapMessage ------------------//
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testMapMessageSendReceive() throws Exception {
+ Connection openwire = createJMSConnection();
+ Connection amqp = createConnection();
+
+ openwire.start();
+ amqp.start();
+
+ Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination queue = openwireSession.createQueue(getDestinationName());
+
+ MessageProducer openwireProducer = openwireSession.createProducer(queue);
+ MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
+
+ // Create the Message
+ ObjectMessage outgoing = openwireSession.createObjectMessage();
+
+ HashMap<String, Object> outgoingMap = new HashMap<String, Object>();
+
+ outgoingMap.put("none", null);
+ outgoingMap.put("string", "test");
+ outgoingMap.put("long", 255L);
+ outgoingMap.put("empty-string", "");
+ outgoingMap.put("negative-int", -1);
+ outgoingMap.put("float", 0.12f);
+
+ outgoing.setObject(outgoingMap);
+
+ openwireProducer.send(outgoing);
+
+ // Now consumer the ObjectMessage
+ Message received = amqpConsumer.receive(2000);
+ assertNotNull(received);
+ assertTrue(received instanceof ObjectMessage);
+ ObjectMessage incoming = (ObjectMessage) received;
+
+ Object incomingObject = incoming.getObject();
+ assertNotNull(incomingObject);
+ assertTrue(incomingObject instanceof Map);
+ Map<String, Object> incomingMap = (Map<String, Object>) incomingObject;
+ assertEquals(outgoingMap.size(), incomingMap.size());
+
+ amqp.close();
+ openwire.close();
+ }
+
+ //----- Tests for OpenWire to Qpid JMS using ObjectMessage ---------------//
+
+ @SuppressWarnings("unchecked")
+ @Test
+ public void testObjectMessageContainingList() throws Exception {
+ Connection openwire = createJMSConnection();
+ Connection amqp = createConnection();
+
+ openwire.start();
+ amqp.start();
+
+ Session openwireSession = openwire.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session amqpSession = amqp.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Destination queue = openwireSession.createQueue(getDestinationName());
+
+ MessageProducer openwireProducer = openwireSession.createProducer(queue);
+ MessageConsumer amqpConsumer = amqpSession.createConsumer(queue);
+
+ // Create the Message
+ ObjectMessage outgoing = openwireSession.createObjectMessage();
+
+ ArrayList<Object> outgoingList = new ArrayList<Object>();
+
+ outgoingList.add(null);
+ outgoingList.add("test");
+ outgoingList.add(255L);
+ outgoingList.add("");
+ outgoingList.add(-1);
+ outgoingList.add(0.12f);
+
+ outgoing.setObject(outgoingList);
+
+ openwireProducer.send(outgoing);
+
+ // Now consumer the ObjectMessage
+ Message received = amqpConsumer.receive(2000);
+ assertNotNull(received);
+ assertTrue(received instanceof ObjectMessage);
+ ObjectMessage incoming = (ObjectMessage) received;
+
+ Object incomingObject = incoming.getObject();
+ assertNotNull(incomingObject);
+ assertTrue(incomingObject instanceof List);
+ List<Object> incomingList = (List<Object>) incomingObject;
+ assertEquals(outgoingList.size(), incomingList.size());
+
+ amqp.close();
+ openwire.close();
+ }
+}