You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/14 14:25:23 UTC

[2/2] activemq-artemis git commit: ARTEMIS-888 - AMQP headers arent always set

ARTEMIS-888 - AMQP headers arent always set

https://issues.apache.org/jira/browse/ARTEMIS-888


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d471f6b1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d471f6b1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d471f6b1

Branch: refs/heads/master
Commit: d471f6b15fab7d7afad8e34635869df18ac0cef4
Parents: d49a517
Author: Andy Taylor <an...@gmail.com>
Authored: Wed Dec 14 13:43:19 2016 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Dec 14 09:25:10 2016 -0500

----------------------------------------------------------------------
 .../message/JMSMappingOutboundTransformer.java  |  38 ++---
 .../transport/amqp/client/AmqpMessage.java      |  19 +++
 .../integration/amqp/AmqpSendReceiveTest.java   | 157 +++++++++++++++++++
 3 files changed, 185 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d471f6b1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
index 2fa7145..baec5f9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
@@ -54,7 +54,6 @@ import java.util.Set;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
-import javax.jms.Message;
 import javax.jms.MessageEOFException;
 import javax.jms.Queue;
 import javax.jms.TemporaryQueue;
@@ -131,7 +130,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
       }
 
       long messageFormat = 0;
-      Header header = null;
+      Header header = new Header();
       Properties properties = null;
       Map<Symbol, Object> daMap = null;
       Map<Symbol, Object> maMap = null;
@@ -140,19 +139,12 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
 
       Section body = convertBody(message);
 
-      if (message.getInnerMessage().isDurable()) {
-         if (header == null) {
-            header = new Header();
-         }
-         header.setDurable(true);
-      }
+      header.setDurable(message.getInnerMessage().isDurable());
+
       byte priority = (byte) message.getJMSPriority();
-      if (priority != Message.DEFAULT_PRIORITY) {
-         if (header == null) {
-            header = new Header();
-         }
-         header.setPriority(UnsignedByte.valueOf(priority));
-      }
+
+      header.setPriority(UnsignedByte.valueOf(priority));
+
       String type = message.getJMSType();
       if (type != null) {
          if (properties == null) {
@@ -160,6 +152,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
          }
          properties.setSubject(type);
       }
+
       String messageId = message.getJMSMessageID();
       if (messageId != null) {
          if (properties == null) {
@@ -211,9 +204,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
             ttl = 1;
          }
 
-         if (header == null) {
-            header = new Header();
-         }
          header.setTtl(new UnsignedInteger((int) ttl));
 
          if (properties == null) {
@@ -237,9 +227,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
                // whereas JMSXDeliveryCount includes the first/current delivery attempt.
                int amqpDeliveryCount = message.getDeliveryCount() - 1;
                if (amqpDeliveryCount > 0) {
-                  if (header == null) {
-                     header = new Header();
-                  }
                   header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
                }
                continue;
@@ -277,15 +264,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
                // skip..internal use only
                continue;
             } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) {
-               if (header == null) {
-                  header = new Header();
-               }
                header.setFirstAcquirer(message.getBooleanProperty(key));
                continue;
             } else if (key.equals(JMS_AMQP_HEADER)) {
-               if (header == null) {
-                  header = new Header();
-               }
                continue;
             } else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
                if (properties == null) {
@@ -365,9 +346,8 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
       EncoderImpl encoder = tlsCodec.get().encoder;
       encoder.setByteBuffer(buffer);
 
-      if (header != null) {
-         encoder.writeObject(header);
-      }
+      encoder.writeObject(header);
+
       if (daMap != null) {
          encoder.writeObject(new DeliveryAnnotations(daMap));
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d471f6b1/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index f7a9364..5cf2c0a 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -402,6 +402,25 @@ public class AmqpMessage {
    }
 
    /**
+    * Sets the priority header on the outgoing message.
+    *
+    * @param priority the priority value to set.
+    */
+   public void setPriority(short priority) {
+      checkReadOnly();
+      lazyCreateHeader();
+      getWrappedMessage().setPriority(priority);
+   }
+
+   /**
+    * Sets the priority header on the outgoing message.
+    */
+   public short getPriority() {
+      return getWrappedMessage().getPriority();
+   }
+
+
+   /**
     * Sets a given application property on an outbound message.
     *
     * @param key   the name to assign the new property.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d471f6b1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index aae2650..3e9072c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -175,6 +175,126 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
+   public void testMessageDurableFalse() throws Exception {
+      sendMessages(getTestName(), 1, false);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(receive);
+      assertFalse(receive.isDurable());
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageDurableTrue() throws Exception {
+      sendMessages(getTestName(), 1, true);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(receive);
+      assertTrue(receive.isDurable());
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageDefaultPriority() throws Exception {
+      sendMessages(getTestName(), 1, (short) 4);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(receive);
+      assertEquals((short) 4, receive.getPriority());
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageNonDefaultPriority() throws Exception {
+      sendMessages(getTestName(), 1, (short) 0);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(receive);
+      assertEquals((short) 0, receive.getPriority());
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
+   public void testMessageNoPriority() throws Exception {
+      sendMessages(getTestName(), 1);
+
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      AmqpSession session = connection.createSession();
+
+      AmqpReceiver receiver = session.createReceiver(getTestName());
+
+      Queue queueView = getProxyToQueue(getTestName());
+      assertEquals(1, queueView.getMessageCount());
+
+      receiver.flow(1);
+      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+      assertNotNull(receive);
+      assertEquals((short) 4, receive.getPriority());
+      receiver.close();
+
+      assertEquals(1, queueView.getMessageCount());
+
+      connection.close();
+   }
+
+   @Test(timeout = 60000)
    public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
       int MSG_COUNT = 4;
       sendMessages(getTestName(), MSG_COUNT);
@@ -849,4 +969,41 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
          connection.close();
       }
    }
+
+
+   public void sendMessages(String destinationName, int count, boolean durable) throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(destinationName);
+
+         for (int i = 0; i < count; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("MessageID:" + i);
+            message.setDurable(durable);
+            sender.send(message);
+         }
+      } finally {
+         connection.close();
+      }
+   }
+
+   public void sendMessages(String destinationName, int count, short priority) throws Exception {
+      AmqpClient client = createAmqpClient();
+      AmqpConnection connection = addConnection(client.connect());
+      try {
+         AmqpSession session = connection.createSession();
+         AmqpSender sender = session.createSender(destinationName);
+
+         for (int i = 0; i < count; ++i) {
+            AmqpMessage message = new AmqpMessage();
+            message.setMessageId("MessageID:" + i);
+            message.setPriority(priority);
+            sender.send(message);
+         }
+      } finally {
+         connection.close();
+      }
+   }
 }