You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/14 15:56:45 UTC

activemq-artemis git commit: Revert "ARTEMIS-888 - AMQP headers arent always set"

Repository: activemq-artemis
Updated Branches:
  refs/heads/master 9954b42b7 -> 08e0c5e4f


Revert "ARTEMIS-888 - AMQP headers arent always set"

This reverts commit d471f6b15fab7d7afad8e34635869df18ac0cef4.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/08e0c5e4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/08e0c5e4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/08e0c5e4

Branch: refs/heads/master
Commit: 08e0c5e4f1c34b6fe82bf232ce87fd5bc774def4
Parents: 9954b42
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Dec 14 10:20:18 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Dec 14 10:20:18 2016 -0500

----------------------------------------------------------------------
 .../message/JMSMappingOutboundTransformer.java  |  38 +++--
 .../transport/amqp/client/AmqpMessage.java      |  19 ---
 .../integration/amqp/AmqpSendReceiveTest.java   | 157 -------------------
 3 files changed, 29 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/08e0c5e4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
index baec5f9..2fa7145 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
@@ -54,6 +54,7 @@ import java.util.Set;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
+import javax.jms.Message;
 import javax.jms.MessageEOFException;
 import javax.jms.Queue;
 import javax.jms.TemporaryQueue;
@@ -130,7 +131,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
       }
 
       long messageFormat = 0;
-      Header header = new Header();
+      Header header = null;
       Properties properties = null;
       Map<Symbol, Object> daMap = null;
       Map<Symbol, Object> maMap = null;
@@ -139,12 +140,19 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
 
       Section body = convertBody(message);
 
-      header.setDurable(message.getInnerMessage().isDurable());
-
+      if (message.getInnerMessage().isDurable()) {
+         if (header == null) {
+            header = new Header();
+         }
+         header.setDurable(true);
+      }
       byte priority = (byte) message.getJMSPriority();
-
-      header.setPriority(UnsignedByte.valueOf(priority));
-
+      if (priority != Message.DEFAULT_PRIORITY) {
+         if (header == null) {
+            header = new Header();
+         }
+         header.setPriority(UnsignedByte.valueOf(priority));
+      }
       String type = message.getJMSType();
       if (type != null) {
          if (properties == null) {
@@ -152,7 +160,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
          }
          properties.setSubject(type);
       }
-
       String messageId = message.getJMSMessageID();
       if (messageId != null) {
          if (properties == null) {
@@ -204,6 +211,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
             ttl = 1;
          }
 
+         if (header == null) {
+            header = new Header();
+         }
          header.setTtl(new UnsignedInteger((int) ttl));
 
          if (properties == null) {
@@ -227,6 +237,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
                // whereas JMSXDeliveryCount includes the first/current delivery attempt.
                int amqpDeliveryCount = message.getDeliveryCount() - 1;
                if (amqpDeliveryCount > 0) {
+                  if (header == null) {
+                     header = new Header();
+                  }
                   header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
                }
                continue;
@@ -264,9 +277,15 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
                // skip..internal use only
                continue;
             } else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) {
+               if (header == null) {
+                  header = new Header();
+               }
                header.setFirstAcquirer(message.getBooleanProperty(key));
                continue;
             } else if (key.equals(JMS_AMQP_HEADER)) {
+               if (header == null) {
+                  header = new Header();
+               }
                continue;
             } else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
                if (properties == null) {
@@ -346,8 +365,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
       EncoderImpl encoder = tlsCodec.get().encoder;
       encoder.setByteBuffer(buffer);
 
-      encoder.writeObject(header);
-
+      if (header != null) {
+         encoder.writeObject(header);
+      }
       if (daMap != null) {
          encoder.writeObject(new DeliveryAnnotations(daMap));
       }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/08e0c5e4/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 5cf2c0a..f7a9364 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -402,25 +402,6 @@ public class AmqpMessage {
    }
 
    /**
-    * Sets the priority header on the outgoing message.
-    *
-    * @param priority the priority value to set.
-    */
-   public void setPriority(short priority) {
-      checkReadOnly();
-      lazyCreateHeader();
-      getWrappedMessage().setPriority(priority);
-   }
-
-   /**
-    * Sets the priority header on the outgoing message.
-    */
-   public short getPriority() {
-      return getWrappedMessage().getPriority();
-   }
-
-
-   /**
     * Sets a given application property on an outbound message.
     *
     * @param key   the name to assign the new property.

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/08e0c5e4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 3e9072c..aae2650 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -175,126 +175,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
    }
 
    @Test(timeout = 60000)
-   public void testMessageDurableFalse() throws Exception {
-      sendMessages(getTestName(), 1, false);
-
-      AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
-      AmqpSession session = connection.createSession();
-
-      AmqpReceiver receiver = session.createReceiver(getTestName());
-
-      Queue queueView = getProxyToQueue(getTestName());
-      assertEquals(1, queueView.getMessageCount());
-
-      receiver.flow(1);
-      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
-      assertNotNull(receive);
-      assertFalse(receive.isDurable());
-      receiver.close();
-
-      assertEquals(1, queueView.getMessageCount());
-
-      connection.close();
-   }
-
-   @Test(timeout = 60000)
-   public void testMessageDurableTrue() throws Exception {
-      sendMessages(getTestName(), 1, true);
-
-      AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
-      AmqpSession session = connection.createSession();
-
-      AmqpReceiver receiver = session.createReceiver(getTestName());
-
-      Queue queueView = getProxyToQueue(getTestName());
-      assertEquals(1, queueView.getMessageCount());
-
-      receiver.flow(1);
-      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
-      assertNotNull(receive);
-      assertTrue(receive.isDurable());
-      receiver.close();
-
-      assertEquals(1, queueView.getMessageCount());
-
-      connection.close();
-   }
-
-   @Test(timeout = 60000)
-   public void testMessageDefaultPriority() throws Exception {
-      sendMessages(getTestName(), 1, (short) 4);
-
-      AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
-      AmqpSession session = connection.createSession();
-
-      AmqpReceiver receiver = session.createReceiver(getTestName());
-
-      Queue queueView = getProxyToQueue(getTestName());
-      assertEquals(1, queueView.getMessageCount());
-
-      receiver.flow(1);
-      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
-      assertNotNull(receive);
-      assertEquals((short) 4, receive.getPriority());
-      receiver.close();
-
-      assertEquals(1, queueView.getMessageCount());
-
-      connection.close();
-   }
-
-   @Test(timeout = 60000)
-   public void testMessageNonDefaultPriority() throws Exception {
-      sendMessages(getTestName(), 1, (short) 0);
-
-      AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
-      AmqpSession session = connection.createSession();
-
-      AmqpReceiver receiver = session.createReceiver(getTestName());
-
-      Queue queueView = getProxyToQueue(getTestName());
-      assertEquals(1, queueView.getMessageCount());
-
-      receiver.flow(1);
-      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
-      assertNotNull(receive);
-      assertEquals((short) 0, receive.getPriority());
-      receiver.close();
-
-      assertEquals(1, queueView.getMessageCount());
-
-      connection.close();
-   }
-
-   @Test(timeout = 60000)
-   public void testMessageNoPriority() throws Exception {
-      sendMessages(getTestName(), 1);
-
-      AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
-      AmqpSession session = connection.createSession();
-
-      AmqpReceiver receiver = session.createReceiver(getTestName());
-
-      Queue queueView = getProxyToQueue(getTestName());
-      assertEquals(1, queueView.getMessageCount());
-
-      receiver.flow(1);
-      AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
-      assertNotNull(receive);
-      assertEquals((short) 4, receive.getPriority());
-      receiver.close();
-
-      assertEquals(1, queueView.getMessageCount());
-
-      connection.close();
-   }
-
-   @Test(timeout = 60000)
    public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
       int MSG_COUNT = 4;
       sendMessages(getTestName(), MSG_COUNT);
@@ -969,41 +849,4 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
          connection.close();
       }
    }
-
-
-   public void sendMessages(String destinationName, int count, boolean durable) throws Exception {
-      AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
-      try {
-         AmqpSession session = connection.createSession();
-         AmqpSender sender = session.createSender(destinationName);
-
-         for (int i = 0; i < count; ++i) {
-            AmqpMessage message = new AmqpMessage();
-            message.setMessageId("MessageID:" + i);
-            message.setDurable(durable);
-            sender.send(message);
-         }
-      } finally {
-         connection.close();
-      }
-   }
-
-   public void sendMessages(String destinationName, int count, short priority) throws Exception {
-      AmqpClient client = createAmqpClient();
-      AmqpConnection connection = addConnection(client.connect());
-      try {
-         AmqpSession session = connection.createSession();
-         AmqpSender sender = session.createSender(destinationName);
-
-         for (int i = 0; i < count; ++i) {
-            AmqpMessage message = new AmqpMessage();
-            message.setMessageId("MessageID:" + i);
-            message.setPriority(priority);
-            sender.send(message);
-         }
-      } finally {
-         connection.close();
-      }
-   }
 }