You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/05/20 21:03:53 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-4900
Repository: activemq
Updated Branches:
refs/heads/master c07d6c841 -> f05ff94e5
https://issues.apache.org/jira/browse/AMQ-4900
Strip delivery annotations from the incoming messages when using JMS
transformer, the other transformers don't currently have a way to do
this.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f05ff94e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f05ff94e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f05ff94e
Branch: refs/heads/master
Commit: f05ff94e5c228921097dcb8f835cdfa4fe7d0a26
Parents: c07d6c8
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed May 20 14:54:13 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed May 20 14:54:13 2015 -0400
----------------------------------------------------------------------
.../message/AMQPNativeOutboundTransformer.java | 5 +-
.../amqp/message/InboundTransformer.java | 9 --
.../transport/amqp/client/AmqpMessage.java | 46 ++++++++++
.../interop/AmqpDeliveryAnnotationsTest.java | 96 ++++++++++++++++++++
4 files changed, 143 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/f05ff94e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
index 19573f9..c1dc976 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
@@ -38,10 +38,7 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
@Override
public EncodedMessage transform(Message msg) throws Exception {
- if (msg == null) {
- return null;
- }
- if (!(msg instanceof BytesMessage)) {
+ if (msg == null || !(msg instanceof BytesMessage)) {
return null;
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/f05ff94e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index 9ceb096..eff0ae7 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -33,7 +33,6 @@ import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.UnsignedShort;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
-import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.Footer;
import org.apache.qpid.proton.amqp.messaging.Header;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
@@ -126,14 +125,6 @@ public abstract class InboundTransformer {
vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
}
- final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
- if (da != null) {
- for (Map.Entry<?, ?> entry : da.getValue().entrySet()) {
- String key = entry.getKey().toString();
- setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
- }
- }
-
final MessageAnnotations ma = amqp.getMessageAnnotations();
if (ma != null) {
for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {
http://git-wip-us.apache.org/repos/asf/activemq/blob/f05ff94e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 32dd1be..705a90d 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -26,6 +26,7 @@ import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.engine.Delivery;
@@ -37,6 +38,7 @@ public class AmqpMessage {
private final Message message;
private final Delivery delivery;
+ private Map<Symbol, Object> deliveryAnnotationsMap;
private Map<Symbol, Object> messageAnnotationsMap;
private Map<String, Object> applicationPropertiesMap;
@@ -87,6 +89,10 @@ public class AmqpMessage {
if (message.getApplicationProperties() != null) {
applicationPropertiesMap = message.getApplicationProperties().getValue();
}
+
+ if (message.getDeliveryAnnotations() != null) {
+ deliveryAnnotationsMap = message.getDeliveryAnnotations().getValue();
+ }
}
//----- Access to interal client resources -------------------------------//
@@ -303,6 +309,39 @@ public class AmqpMessage {
}
/**
+ * Perform a proper delivery annotation set on the AMQP Message based on a Symbol
+ * key and the target value to append to the current delivery annotations.
+ *
+ * @param key
+ * The name of the Symbol whose value is being set.
+ * @param value
+ * The new value to set in the delivery annotations of this message.
+ */
+ public void setDeliveryAnnotation(String key, Object value) {
+ checkReadOnly();
+ lazyCreateDeliveryAnnotations();
+ deliveryAnnotationsMap.put(Symbol.valueOf(key), value);
+ }
+
+ /**
+ * Given a message annotation name, lookup and return the value associated with
+ * that annotation name. If the message annotations have not been created yet
+ * then this method will always return null.
+ *
+ * @param key
+ * the Symbol name that should be looked up in the message annotations.
+ *
+ * @return the value of the annotation if it exists, or null if not set or not accessible.
+ */
+ public Object getDeliveryAnnotation(String key) {
+ if (deliveryAnnotationsMap == null) {
+ return null;
+ }
+
+ return deliveryAnnotationsMap.get(Symbol.valueOf(key));
+ }
+
+ /**
* Sets a String value into the body of an outgoing Message, throws
* an exception if this is an incoming message instance.
*
@@ -347,6 +386,13 @@ public class AmqpMessage {
}
}
+ private void lazyCreateDeliveryAnnotations() {
+ if (deliveryAnnotationsMap == null) {
+ deliveryAnnotationsMap = new HashMap<Symbol,Object>();
+ message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
+ }
+ }
+
private void lazyCreateApplicationProperties() {
if (applicationPropertiesMap == null) {
applicationPropertiesMap = new HashMap<String, Object>();
http://git-wip-us.apache.org/repos/asf/activemq/blob/f05ff94e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDeliveryAnnotationsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDeliveryAnnotationsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDeliveryAnnotationsTest.java
new file mode 100644
index 0000000..bc992d0
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDeliveryAnnotationsTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test around the handling of Deliver Annotations in messages sent and received.
+ */
+@RunWith(Parameterized.class)
+public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport {
+
+ private final String DELIVERY_ANNOTATION_NAME = "TEST-DELIVERY-ANNOTATION";
+
+ private final String transformer;
+
+ @Parameters(name="{0}")
+ public static Collection<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ {"jms"},
+ // {"native"},
+ // {"raw"} We cannot fix these now because proton has no way to selectively
+ // prune the incoming message bytes from delivery annotations section
+ // can be stripped from the message.
+ });
+ }
+
+ public AmqpDeliveryAnnotationsTest(String transformer) {
+ this.transformer = transformer;
+ }
+
+ @Override
+ protected String getAmqpTransformer() {
+ return transformer;
+ }
+
+ @Test(timeout = 60000)
+ public void testDeliveryAnnotationsStrippedFromIncoming() throws Exception {
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender("queue://" + getTestName());
+ AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+ AmqpMessage message = new AmqpMessage();
+
+ message.setText("Test-Message");
+ message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getTestName());
+
+ sender.send(message);
+ receiver.flow(1);
+
+ QueueViewMBean queue = getProxyToQueue(getTestName());
+ assertEquals(1, queue.getQueueSize());
+
+ AmqpMessage received = receiver.receive(); //5, TimeUnit.SECONDS);
+ assertNotNull(received);
+ assertNull(received.getDeliveryAnnotation(DELIVERY_ANNOTATION_NAME));
+
+ sender.close();
+ connection.close();
+ }
+}