You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/27 15:40:49 UTC

[1/2] activemq-artemis git commit: ARTEMIS-193 - OpenWire protocol only works with messages received over openwire

Repository: activemq-artemis
Updated Branches:
  refs/heads/master b1f1c5a96 -> 33a5e29fb


ARTEMIS-193 - OpenWire protocol only works with messages received over openwire

Fix the address conversion between protocols so its consistent

https://issues.apache.org/jira/browse/ARTEMIS-193


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/2f18b4cb
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/2f18b4cb
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/2f18b4cb

Branch: refs/heads/master
Commit: 2f18b4cbfc4d2576c310634847bc1a955750cfd6
Parents: b1f1c5a
Author: Andy Taylor <an...@gmail.com>
Authored: Thu Aug 27 10:23:22 2015 +0100
Committer: Andy Taylor <an...@gmail.com>
Committed: Thu Aug 27 10:23:38 2015 +0100

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      | 28 +++-----------------
 .../core/protocol/openwire/OpenWireUtil.java    | 20 ++++++++++++++
 .../core/protocol/openwire/amq/AMQConsumer.java |  4 +++
 .../openwire/interop/GeneralInteropTest.java    | 23 ++++++++++++++++
 4 files changed, 50 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f18b4cb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 90518ec..717ca8e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -71,7 +71,6 @@ public class OpenWireMessageConverter implements MessageConverter {
    private static final String AMQ_MSG_CLUSTER = AMQ_PREFIX + "CLUSTER";
    private static final String AMQ_MSG_COMMAND_ID = AMQ_PREFIX + "COMMAND_ID";
    private static final String AMQ_MSG_DATASTRUCTURE = AMQ_PREFIX + "DATASTRUCTURE";
-   private static final String AMQ_MSG_DESTINATION = AMQ_PREFIX + "DESTINATION";
    private static final String AMQ_MSG_GROUP_ID = AMQ_PREFIX + "GROUP_ID";
    private static final String AMQ_MSG_GROUP_SEQUENCE = AMQ_PREFIX + "GROUP_SEQUENCE";
    private static final String AMQ_MSG_MESSAGE_ID = AMQ_PREFIX + "MESSAGE_ID";
@@ -253,10 +252,6 @@ public class OpenWireMessageConverter implements MessageConverter {
          dsBytes.compact();
          coreMessage.putBytesProperty(AMQ_MSG_DATASTRUCTURE, dsBytes.data);
       }
-      ActiveMQDestination dest = messageSend.getDestination();
-      ByteSequence destBytes = marshaller.marshal(dest);
-      destBytes.compact();
-      coreMessage.putBytesProperty(AMQ_MSG_DESTINATION, destBytes.data);
       String groupId = messageSend.getGroupID();
       if (groupId != null) {
          coreMessage.putStringProperty(AMQ_MSG_GROUP_ID, groupId);
@@ -269,18 +264,6 @@ public class OpenWireMessageConverter implements MessageConverter {
       midBytes.compact();
       coreMessage.putBytesProperty(AMQ_MSG_MESSAGE_ID, midBytes.data);
 
-      ActiveMQDestination origDest = messageSend.getOriginalDestination();
-      if (origDest != null) {
-         ByteSequence origDestBytes = marshaller.marshal(origDest);
-         origDestBytes.compact();
-         coreMessage.putBytesProperty(AMQ_MSG_ORIG_DESTINATION, origDestBytes.data);
-      }
-      TransactionId origTxId = messageSend.getOriginalTransactionId();
-      if (origTxId != null) {
-         ByteSequence origTxBytes = marshaller.marshal(origTxId);
-         origTxBytes.compact();
-         coreMessage.putBytesProperty(AMQ_MSG_ORIG_TXID, origTxBytes.data);
-      }
       ProducerId producerId = messageSend.getProducerId();
       if (producerId != null) {
          ByteSequence producerIdBytes = marshaller.marshal(producerId);
@@ -375,7 +358,7 @@ public class OpenWireMessageConverter implements MessageConverter {
    public static MessageDispatch createMessageDispatch(ServerMessage message,
                                                        int deliveryCount,
                                                        AMQConsumer consumer) throws IOException {
-      ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller());
+      ActiveMQMessage amqMessage = toAMQMessage(message, consumer.getMarshaller(), consumer.getActualDestination());
 
       MessageDispatch md = new MessageDispatch();
       md.setConsumerId(consumer.getId());
@@ -387,7 +370,7 @@ public class OpenWireMessageConverter implements MessageConverter {
       return md;
    }
 
-   private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller) throws IOException {
+   private static ActiveMQMessage toAMQMessage(ServerMessage coreMessage, WireFormat marshaller, ActiveMQDestination actualDestination) throws IOException {
       ActiveMQMessage amqMsg = null;
       byte coreType = coreMessage.getType();
       switch (coreType) {
@@ -582,12 +565,7 @@ public class OpenWireMessageConverter implements MessageConverter {
          amqMsg.setDataStructure(ds);
       }
 
-      byte[] destBytes = (byte[]) coreMessage.getObjectProperty(AMQ_MSG_DESTINATION);
-      if (destBytes != null) {
-         ByteSequence seq = new ByteSequence(destBytes);
-         ActiveMQDestination dest = (ActiveMQDestination) marshaller.unmarshal(seq);
-         amqMsg.setDestination(dest);
-      }
+      amqMsg.setDestination(OpenWireUtil.toAMQAddress(coreMessage, actualDestination));
 
       Object value = coreMessage.getObjectProperty(AMQ_MSG_GROUP_ID);
       if (value != null) {

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f18b4cb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
index fbd3aec..d684761 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireUtil.java
@@ -18,11 +18,14 @@ package org.apache.activemq.artemis.core.protocol.openwire;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
 import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.core.server.ServerMessage;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
 import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
 import org.apache.activemq.artemis.core.server.ActiveMQMessageBundle;
 import org.apache.activemq.artemis.core.server.BindingQueryResult;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.util.ByteSequence;
 import org.apache.activemq.artemis.api.core.SimpleString;
 
@@ -45,6 +48,23 @@ public class OpenWireUtil {
    }
 
    /**
+    * We convert the core address to an ActiveMQ Destination. We use the actual address on the message rather than the
+    * destination set on the consumer because it maybe different and the JMS spec says that it should be what ever was
+    * set on publish/send so a divert or wildcard may mean thats its different to the destination subscribed to by the
+    * consumer
+    */
+   public static ActiveMQDestination toAMQAddress(ServerMessage message, ActiveMQDestination actualDestination) {
+      String address = message.getAddress().toString();
+      String strippedAddress = address.replace("jms.queue.", "").replace("jms.topic.", "");
+      if (actualDestination.isQueue()) {
+         return new ActiveMQQueue(strippedAddress);
+      }
+      else {
+         return new ActiveMQTopic(strippedAddress);
+      }
+   }
+
+   /**
     * Checks to see if this destination exists.  If it does not throw an invalid destination exception.
     *
     * @param destination

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f18b4cb/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
index 789e527..40d253e 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/amq/AMQConsumer.java
@@ -370,6 +370,10 @@ public class AMQConsumer implements BrowserListener {
       session.removeConsumer(nativeId);
    }
 
+   public org.apache.activemq.command.ActiveMQDestination getActualDestination() {
+      return actualDest;
+   }
+
    private class MessagePullHandler {
 
       private long next = -1;

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/2f18b4cb/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
index b88d9ea..3f2b7b6 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/interop/GeneralInteropTest.java
@@ -69,11 +69,15 @@ public class GeneralInteropTest extends BasicOpenWireTest {
 
       assertEquals(text, textMessage.getText());
 
+      assertEquals(destination, textMessage.getJMSDestination());
+
       //map messages
       sendMapMessageUsingCoreJms(queueName);
 
       MapMessage mapMessage = (MapMessage) consumer.receive(5000);
 
+      assertEquals(destination, mapMessage.getJMSDestination());
+
       assertTrue(mapMessage.getBoolean("aboolean"));
       assertEquals((byte) 4, mapMessage.getByte("abyte"));
       byte[] bytes = mapMessage.getBytes("abytes");
@@ -105,6 +109,9 @@ public class GeneralInteropTest extends BasicOpenWireTest {
       sendStreamMessageUsingCoreJms(queueName);
 
       StreamMessage streamMessage = (StreamMessage) consumer.receive(5000);
+
+      assertEquals(destination, streamMessage.getJMSDestination());
+
       assertTrue(streamMessage.readBoolean());
       assertEquals((byte) 2, streamMessage.readByte());
 
@@ -146,6 +153,8 @@ public class GeneralInteropTest extends BasicOpenWireTest {
       sendMessageUsingCoreJms(queueName);
 
       javax.jms.Message genericMessage = consumer.receive(5000);
+
+      assertEquals(destination, genericMessage.getJMSDestination());
       String value = genericMessage.getStringProperty("stringProperty");
       assertEquals("HelloMessage", value);
       assertFalse(genericMessage.getBooleanProperty("booleanProperty"));
@@ -171,6 +180,8 @@ public class GeneralInteropTest extends BasicOpenWireTest {
       for (int i = 0; i < num; i++) {
          TextMessage textMessage = (TextMessage) consumer.receive(5000);
          assertEquals(text + i, textMessage.getText());
+
+         assertEquals(destination, textMessage.getJMSDestination());
       }
    }
 
@@ -365,12 +376,15 @@ public class GeneralInteropTest extends BasicOpenWireTest {
 
          TextMessage txtMessage = (TextMessage) coreConsumer.receive(5000);
          assertEquals(text, txtMessage.getText());
+         assertEquals(txtMessage.getJMSDestination(), queue);
 
          // map messages
          sendMapMessageUsingOpenWire();
 
          MapMessage mapMessage = (MapMessage) coreConsumer.receive(5000);
 
+         assertEquals(mapMessage.getJMSDestination(), queue);
+
          assertTrue(mapMessage.getBoolean("aboolean"));
          assertEquals((byte) 4, mapMessage.getByte("abyte"));
          byte[] bytes = mapMessage.getBytes("abytes");
@@ -392,6 +406,9 @@ public class GeneralInteropTest extends BasicOpenWireTest {
          sendObjectMessageUsingOpenWire(obj);
 
          ObjectMessage objectMessage = (ObjectMessage) coreConsumer.receive(5000);
+
+         assertEquals(objectMessage.getJMSDestination(), queue);
+
          SimpleSerializable data = (SimpleSerializable) objectMessage.getObject();
 
          assertEquals(obj.objName, data.objName);
@@ -402,6 +419,8 @@ public class GeneralInteropTest extends BasicOpenWireTest {
          sendStreamMessageUsingOpenWire(queueName);
 
          StreamMessage streamMessage = (StreamMessage) coreConsumer.receive(5000);
+
+         assertEquals(streamMessage.getJMSDestination(), queue);
          assertTrue(streamMessage.readBoolean());
          assertEquals((byte) 2, streamMessage.readByte());
 
@@ -426,6 +445,8 @@ public class GeneralInteropTest extends BasicOpenWireTest {
          sendBytesMessageUsingOpenWire(bytesData);
 
          BytesMessage bytesMessage = (BytesMessage) coreConsumer.receive(5000);
+
+         assertEquals(bytesMessage.getJMSDestination(), queue);
          byte[] rawBytes = new byte[bytesData.length];
          bytesMessage.readBytes(rawBytes);
 
@@ -437,6 +458,7 @@ public class GeneralInteropTest extends BasicOpenWireTest {
          sendMessageUsingOpenWire(queueName);
 
          javax.jms.Message genericMessage = coreConsumer.receive(5000);
+         assertEquals(genericMessage.getJMSDestination(), queue);
          String value = genericMessage.getStringProperty("stringProperty");
          assertEquals("HelloMessage", value);
          assertFalse(genericMessage.getBooleanProperty("booleanProperty"));
@@ -471,6 +493,7 @@ public class GeneralInteropTest extends BasicOpenWireTest {
 
          for (int i = 0; i < num; i++) {
             TextMessage txtMessage = (TextMessage) coreConsumer.receive(5000);
+            assertEquals(txtMessage.getJMSDestination(), queue);
             assertEquals(text + i, txtMessage.getText());
          }
       }


[2/2] activemq-artemis git commit: This closes #139 openwire interop fix

Posted by cl...@apache.org.
This closes #139 openwire interop fix


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/33a5e29f
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/33a5e29f
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/33a5e29f

Branch: refs/heads/master
Commit: 33a5e29fbcaed05a60d0e1085f59a5b3b76155b0
Parents: b1f1c5a 2f18b4c
Author: Clebert Suconic <cl...@apache.org>
Authored: Thu Aug 27 09:40:37 2015 -0400
Committer: Clebert Suconic <cl...@apache.org>
Committed: Thu Aug 27 09:40:37 2015 -0400

----------------------------------------------------------------------
 .../openwire/OpenWireMessageConverter.java      | 28 +++-----------------
 .../core/protocol/openwire/OpenWireUtil.java    | 20 ++++++++++++++
 .../core/protocol/openwire/amq/AMQConsumer.java |  4 +++
 .../openwire/interop/GeneralInteropTest.java    | 23 ++++++++++++++++
 4 files changed, 50 insertions(+), 25 deletions(-)
----------------------------------------------------------------------