You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/12/14 15:56:45 UTC
activemq-artemis git commit: Revert "ARTEMIS-888 - AMQP headers arent
always set"
Repository: activemq-artemis
Updated Branches:
refs/heads/master 9954b42b7 -> 08e0c5e4f
Revert "ARTEMIS-888 - AMQP headers arent always set"
This reverts commit d471f6b15fab7d7afad8e34635869df18ac0cef4.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/08e0c5e4
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/08e0c5e4
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/08e0c5e4
Branch: refs/heads/master
Commit: 08e0c5e4f1c34b6fe82bf232ce87fd5bc774def4
Parents: 9954b42
Author: Clebert Suconic <cl...@apache.org>
Authored: Wed Dec 14 10:20:18 2016 -0500
Committer: Clebert Suconic <cl...@apache.org>
Committed: Wed Dec 14 10:20:18 2016 -0500
----------------------------------------------------------------------
.../message/JMSMappingOutboundTransformer.java | 38 +++--
.../transport/amqp/client/AmqpMessage.java | 19 ---
.../integration/amqp/AmqpSendReceiveTest.java | 157 -------------------
3 files changed, 29 insertions(+), 185 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/08e0c5e4/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
index baec5f9..2fa7145 100644
--- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
+++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java
@@ -54,6 +54,7 @@ import java.util.Set;
import javax.jms.Destination;
import javax.jms.JMSException;
+import javax.jms.Message;
import javax.jms.MessageEOFException;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
@@ -130,7 +131,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
}
long messageFormat = 0;
- Header header = new Header();
+ Header header = null;
Properties properties = null;
Map<Symbol, Object> daMap = null;
Map<Symbol, Object> maMap = null;
@@ -139,12 +140,19 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
Section body = convertBody(message);
- header.setDurable(message.getInnerMessage().isDurable());
-
+ if (message.getInnerMessage().isDurable()) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setDurable(true);
+ }
byte priority = (byte) message.getJMSPriority();
-
- header.setPriority(UnsignedByte.valueOf(priority));
-
+ if (priority != Message.DEFAULT_PRIORITY) {
+ if (header == null) {
+ header = new Header();
+ }
+ header.setPriority(UnsignedByte.valueOf(priority));
+ }
String type = message.getJMSType();
if (type != null) {
if (properties == null) {
@@ -152,7 +160,6 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
}
properties.setSubject(type);
}
-
String messageId = message.getJMSMessageID();
if (messageId != null) {
if (properties == null) {
@@ -204,6 +211,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
ttl = 1;
}
+ if (header == null) {
+ header = new Header();
+ }
header.setTtl(new UnsignedInteger((int) ttl));
if (properties == null) {
@@ -227,6 +237,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
// whereas JMSXDeliveryCount includes the first/current delivery attempt.
int amqpDeliveryCount = message.getDeliveryCount() - 1;
if (amqpDeliveryCount > 0) {
+ if (header == null) {
+ header = new Header();
+ }
header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount));
}
continue;
@@ -264,9 +277,15 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
// skip..internal use only
continue;
} else if (key.equals(JMS_AMQP_FIRST_ACQUIRER)) {
+ if (header == null) {
+ header = new Header();
+ }
header.setFirstAcquirer(message.getBooleanProperty(key));
continue;
} else if (key.equals(JMS_AMQP_HEADER)) {
+ if (header == null) {
+ header = new Header();
+ }
continue;
} else if (key.startsWith(JMS_AMQP_PROPERTIES)) {
if (properties == null) {
@@ -346,8 +365,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer {
EncoderImpl encoder = tlsCodec.get().encoder;
encoder.setByteBuffer(buffer);
- encoder.writeObject(header);
-
+ if (header != null) {
+ encoder.writeObject(header);
+ }
if (daMap != null) {
encoder.writeObject(new DeliveryAnnotations(daMap));
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/08e0c5e4/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 5cf2c0a..f7a9364 100644
--- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -402,25 +402,6 @@ public class AmqpMessage {
}
/**
- * Sets the priority header on the outgoing message.
- *
- * @param priority the priority value to set.
- */
- public void setPriority(short priority) {
- checkReadOnly();
- lazyCreateHeader();
- getWrappedMessage().setPriority(priority);
- }
-
- /**
- * Sets the priority header on the outgoing message.
- */
- public short getPriority() {
- return getWrappedMessage().getPriority();
- }
-
-
- /**
* Sets a given application property on an outbound message.
*
* @param key the name to assign the new property.
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/08e0c5e4/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
index 3e9072c..aae2650 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpSendReceiveTest.java
@@ -175,126 +175,6 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
}
@Test(timeout = 60000)
- public void testMessageDurableFalse() throws Exception {
- sendMessages(getTestName(), 1, false);
-
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
-
- AmqpReceiver receiver = session.createReceiver(getTestName());
-
- Queue queueView = getProxyToQueue(getTestName());
- assertEquals(1, queueView.getMessageCount());
-
- receiver.flow(1);
- AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(receive);
- assertFalse(receive.isDurable());
- receiver.close();
-
- assertEquals(1, queueView.getMessageCount());
-
- connection.close();
- }
-
- @Test(timeout = 60000)
- public void testMessageDurableTrue() throws Exception {
- sendMessages(getTestName(), 1, true);
-
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
-
- AmqpReceiver receiver = session.createReceiver(getTestName());
-
- Queue queueView = getProxyToQueue(getTestName());
- assertEquals(1, queueView.getMessageCount());
-
- receiver.flow(1);
- AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(receive);
- assertTrue(receive.isDurable());
- receiver.close();
-
- assertEquals(1, queueView.getMessageCount());
-
- connection.close();
- }
-
- @Test(timeout = 60000)
- public void testMessageDefaultPriority() throws Exception {
- sendMessages(getTestName(), 1, (short) 4);
-
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
-
- AmqpReceiver receiver = session.createReceiver(getTestName());
-
- Queue queueView = getProxyToQueue(getTestName());
- assertEquals(1, queueView.getMessageCount());
-
- receiver.flow(1);
- AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(receive);
- assertEquals((short) 4, receive.getPriority());
- receiver.close();
-
- assertEquals(1, queueView.getMessageCount());
-
- connection.close();
- }
-
- @Test(timeout = 60000)
- public void testMessageNonDefaultPriority() throws Exception {
- sendMessages(getTestName(), 1, (short) 0);
-
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
-
- AmqpReceiver receiver = session.createReceiver(getTestName());
-
- Queue queueView = getProxyToQueue(getTestName());
- assertEquals(1, queueView.getMessageCount());
-
- receiver.flow(1);
- AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(receive);
- assertEquals((short) 0, receive.getPriority());
- receiver.close();
-
- assertEquals(1, queueView.getMessageCount());
-
- connection.close();
- }
-
- @Test(timeout = 60000)
- public void testMessageNoPriority() throws Exception {
- sendMessages(getTestName(), 1);
-
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
- AmqpSession session = connection.createSession();
-
- AmqpReceiver receiver = session.createReceiver(getTestName());
-
- Queue queueView = getProxyToQueue(getTestName());
- assertEquals(1, queueView.getMessageCount());
-
- receiver.flow(1);
- AmqpMessage receive = receiver.receive(5, TimeUnit.SECONDS);
- assertNotNull(receive);
- assertEquals((short) 4, receive.getPriority());
- receiver.close();
-
- assertEquals(1, queueView.getMessageCount());
-
- connection.close();
- }
-
- @Test(timeout = 60000)
public void testTwoQueueReceiversOnSameConnectionReadMessagesNoDispositions() throws Exception {
int MSG_COUNT = 4;
sendMessages(getTestName(), MSG_COUNT);
@@ -969,41 +849,4 @@ public class AmqpSendReceiveTest extends AmqpClientTestSupport {
connection.close();
}
}
-
-
- public void sendMessages(String destinationName, int count, boolean durable) throws Exception {
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
- try {
- AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(destinationName);
-
- for (int i = 0; i < count; ++i) {
- AmqpMessage message = new AmqpMessage();
- message.setMessageId("MessageID:" + i);
- message.setDurable(durable);
- sender.send(message);
- }
- } finally {
- connection.close();
- }
- }
-
- public void sendMessages(String destinationName, int count, short priority) throws Exception {
- AmqpClient client = createAmqpClient();
- AmqpConnection connection = addConnection(client.connect());
- try {
- AmqpSession session = connection.createSession();
- AmqpSender sender = session.createSender(destinationName);
-
- for (int i = 0; i < count; ++i) {
- AmqpMessage message = new AmqpMessage();
- message.setMessageId("MessageID:" + i);
- message.setPriority(priority);
- sender.send(message);
- }
- } finally {
- connection.close();
- }
- }
}