You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/14 14:25:23 UTC
[2/2] activemq-artemis git commit: ARTEMIS-888 - AMQP headers arent
always set
ARTEMIS-888 - AMQP headers arent always set
https://issues.apache.org/jira/browse/ARTEMIS-888
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/d471f6b1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/d471f6b1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/d471f6b1
Branch: refs/heads/master
Commit: d471f6b15fab7d7afad8e34635869df18ac0cef4
Parents: d49a517
Author: Andy Taylor <an...@gmail.com>
Authored: Wed Dec 14 13:43:19 2016 +0000
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Dec 14 09:25:10 2016 -0500
----------------------------------------------------------------------
.../message/JMSMappingOutboundTransformer.java | 38 ++---
.../transport/amqp/client/AmqpMessage.java | 19 +++
.../integration/amqp/AmqpSendReceiveTest.java | 157 +++++++++++++++++++
3 files changed, 185 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d471f6b1/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
index 2fa7145..baec5f9 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
@@ -54,7 +54,6 @@ import java.util.Set;
import javax.jms.Destination;
import javax.jms.JMSException;
-import javax.jms.Message;
import javax.jms.MessageEOFException;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
@@ -131,7 +130,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
}
long messageFormat = 0;
- Header header = null;
+ Header header = new Header();
Properties properties = null;
Map<Symbol, Object> daMap = null;
Map<Symbol, Object> maMap = null;
@@ -140,19 +139,12 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
Section body = convertBody(message);
- if (message.getInnerMessage().isDurable()) {
- if (header == null) {
- header = new Header();
- }
- header.setDurable(true);
- }
+ header.setDurable(message.getInnerMessage().isDurable());
+
byte priority = (byte) message.getJMSPriority();
- if (priority != Message.DEFAULT_PRIORITY) {
- if (header == null) {
- header = new Header();
- }
- header.setPriority(UnsignedByte.valueOf(priority));
- }
+
+ header.setPriority(UnsignedByte.valueOf(priority));
+
String type = message.getJMSType();
if (type != null) {
if (properties == null) {
@@ -160,6 +152,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
}
properties.setSubject(type);
}
+
String messageId = message.getJMSMessageID();
if (messageId != null) {
if (properties == null) {
@@ -211,9 +204,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
ttl = 1;
}
- if (header == null) {
- header = new Header();
- }
header.setTtl(new UnsignedInteger((int) ttl));
if (properties == null) {
@@ -237,9 +227,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
// whereas JMSXDeliveryCount includes the first/current delivery attempt.
int amqpDeliveryCount = message.getDeliveryCount() - 1;
if (amqpDeliveryCount > 0) {
- if (header == null) {
- header = new Header();
- }
header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
}
continue;
@@ -277,15 +264,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
// skip..internal use only
continue;
} else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) {
- if (header == null) {
- header = new Header();
- }
header.setFirstAcquirer(message.getBooleanProperty(key));
continue;
} else if (key.equals(JMS_AMQP_HEADER)) {
- if (header == null) {
- header = new Header();
- }
continue;
} else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
if (properties == null) {
@@ -365,9 +346,8 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
EncoderImpl encoder = tlsCodec.get().encoder;
encoder.setByteBuffer(buffer);
- if (header != null) {
- encoder.writeObject(header);
- }
+ encoder.writeObject(header);
+
if (daMap != null) {
encoder.writeObject(new DeliveryAnnotations(daMap));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d471f6b1/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index f7a9364..5cf2c0a 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -402,6 +402,25 @@ public class AmqpMessage {
}
/**
+ * Sets the priority header on the outgoing message.
+ *
+ * @param priority the priority value to set.
+ */
+ public void setPriority(short priority) {
+ checkReadOnly();
+ lazyCreateHeader();
+ getWrappedMessage().setPriority(priority);
+ }
+
+ /**
+ * Sets the priority header on the outgoing message.
+ */
+ public short getPriority() {
+ return getWrappedMessage().getPriority();
+ }
+
+
+ /**
* Sets a given application property on an outbound message.
*
* @param key the name to assign the new property.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/d471f6b1/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index aae2650..3e9072c 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -175,6 +175,126 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
+ public void testMessageDurableFalse() throws Exception {
+ sendMessages(getTestName(), 1, false);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ Queue queueView = getProxyToQueue(getTestName());
+ assertEquals(1, queueView.getMessageCount());
+
+ receiver.flow(1);
+ AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(receive);
+ assertFalse(receive.isDurable());
+ receiver.close();
+
+ assertEquals(1, queueView.getMessageCount());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testMessageDurableTrue() throws Exception {
+ sendMessages(getTestName(), 1, true);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ Queue queueView = getProxyToQueue(getTestName());
+ assertEquals(1, queueView.getMessageCount());
+
+ receiver.flow(1);
+ AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(receive);
+ assertTrue(receive.isDurable());
+ receiver.close();
+
+ assertEquals(1, queueView.getMessageCount());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testMessageDefaultPriority() throws Exception {
+ sendMessages(getTestName(), 1, (short) 4);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ Queue queueView = getProxyToQueue(getTestName());
+ assertEquals(1, queueView.getMessageCount());
+
+ receiver.flow(1);
+ AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(receive);
+ assertEquals((short) 4, receive.getPriority());
+ receiver.close();
+
+ assertEquals(1, queueView.getMessageCount());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testMessageNonDefaultPriority() throws Exception {
+ sendMessages(getTestName(), 1, (short) 0);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ Queue queueView = getProxyToQueue(getTestName());
+ assertEquals(1, queueView.getMessageCount());
+
+ receiver.flow(1);
+ AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(receive);
+ assertEquals((short) 0, receive.getPriority());
+ receiver.close();
+
+ assertEquals(1, queueView.getMessageCount());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testMessageNoPriority() throws Exception {
+ sendMessages(getTestName(), 1);
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ AmqpSession session = connection.createSession();
+
+ AmqpReceiver receiver = session.createReceiver(getTestName());
+
+ Queue queueView = getProxyToQueue(getTestName());
+ assertEquals(1, queueView.getMessageCount());
+
+ receiver.flow(1);
+ AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
+ assertNotNull(receive);
+ assertEquals((short) 4, receive.getPriority());
+ receiver.close();
+
+ assertEquals(1, queueView.getMessageCount());
+
+ connection.close();
+ }
+
+ @Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
int MSG_COUNT = 4;
sendMessages(getTestName(), MSG_COUNT);
@@ -849,4 +969,41 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close();
}
}
+
+
+ public void sendMessages(String destinationName, int count, boolean durable) throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(destinationName);
+
+ for (int i = 0; i < count; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("MessageID:" + i);
+ message.setDurable(durable);
+ sender.send(message);
+ }
+ } finally {
+ connection.close();
+ }
+ }
+
+ public void sendMessages(String destinationName, int count, short priority) throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = addConnection(client.connect());
+ try {
+ AmqpSession session = connection.createSession();
+ AmqpSender sender = session.createSender(destinationName);
+
+ for (int i = 0; i < count; ++i) {
+ AmqpMessage message = new AmqpMessage();
+ message.setMessageId("MessageID:" + i);
+ message.setPriority(priority);
+ sender.send(message);
+ }
+ } finally {
+ connection.close();
+ }
+ }
}