You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/05/20 21:03:53 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-4900

Repository: activemq
Updated Branches:
  refs/heads/master c07d6c841 -> f05ff94e5


https://issues.apache.org/jira/browse/AMQ-4900

Strip delivery annotations from the incoming messages when using JMS
transformer, the other transformers don't currently have a way to do
this. 

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f05ff94e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f05ff94e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f05ff94e

Branch: refs/heads/master
Commit: f05ff94e5c228921097dcb8f835cdfa4fe7d0a26
Parents: c07d6c8
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed May 20 14:54:13 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed May 20 14:54:13 2015 -0400

----------------------------------------------------------------------
 .../message/AMQPNativeOutboundTransformer.java  |  5 +-
 .../amqp/message/InboundTransformer.java        |  9 --
 .../transport/amqp/client/AmqpMessage.java      | 46 ++++++++++
 .../interop/AmqpDeliveryAnnotationsTest.java    | 96 ++++++++++++++++++++
 4 files changed, 143 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f05ff94e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
index 19573f9..c1dc976 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/AMQPNativeOutboundTransformer.java
@@ -38,10 +38,7 @@ public class AMQPNativeOutboundTransformer extends OutboundTransformer {
 
     @Override
     public EncodedMessage transform(Message msg) throws Exception {
-        if (msg == null) {
-            return null;
-        }
-        if (!(msg instanceof BytesMessage)) {
+        if (msg == null || !(msg instanceof BytesMessage)) {
             return null;
         }
 

http://git-wip-us.apache.org/repos/asf/activemq/blob/f05ff94e/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
index 9ceb096..eff0ae7 100644
--- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
+++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/message/InboundTransformer.java
@@ -33,7 +33,6 @@ import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.apache.qpid.proton.amqp.UnsignedLong;
 import org.apache.qpid.proton.amqp.UnsignedShort;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
-import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Footer;
 import org.apache.qpid.proton.amqp.messaging.Header;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
@@ -126,14 +125,6 @@ public abstract class InboundTransformer {
             vendor.setJMSXDeliveryCount(jms, header.getDeliveryCount().longValue());
         }
 
-        final DeliveryAnnotations da = amqp.getDeliveryAnnotations();
-        if (da != null) {
-            for (Map.Entry<?, ?> entry : da.getValue().entrySet()) {
-                String key = entry.getKey().toString();
-                setProperty(jms, prefixVendor + prefixDeliveryAnnotations + key, entry.getValue());
-            }
-        }
-
         final MessageAnnotations ma = amqp.getMessageAnnotations();
         if (ma != null) {
             for (Map.Entry<?, ?> entry : ma.getValue().entrySet()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/f05ff94e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 32dd1be..705a90d 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -26,6 +26,7 @@ import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
 import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.engine.Delivery;
@@ -37,6 +38,7 @@ public class AmqpMessage {
     private final Message message;
     private final Delivery delivery;
 
+    private Map<Symbol, Object> deliveryAnnotationsMap;
     private Map<Symbol, Object> messageAnnotationsMap;
     private Map<String, Object> applicationPropertiesMap;
 
@@ -87,6 +89,10 @@ public class AmqpMessage {
         if (message.getApplicationProperties() != null) {
             applicationPropertiesMap = message.getApplicationProperties().getValue();
         }
+
+        if (message.getDeliveryAnnotations() != null) {
+            deliveryAnnotationsMap = message.getDeliveryAnnotations().getValue();
+        }
     }
 
     //----- Access to interal client resources -------------------------------//
@@ -303,6 +309,39 @@ public class AmqpMessage {
     }
 
     /**
+     * Perform a proper delivery annotation set on the AMQP Message based on a Symbol
+     * key and the target value to append to the current delivery annotations.
+     *
+     * @param key
+     *        The name of the Symbol whose value is being set.
+     * @param value
+     *        The new value to set in the delivery annotations of this message.
+     */
+    public void setDeliveryAnnotation(String key, Object value) {
+        checkReadOnly();
+        lazyCreateDeliveryAnnotations();
+        deliveryAnnotationsMap.put(Symbol.valueOf(key), value);
+    }
+
+    /**
+     * Given a message annotation name, lookup and return the value associated with
+     * that annotation name.  If the message annotations have not been created yet
+     * then this method will always return null.
+     *
+     * @param key
+     *        the Symbol name that should be looked up in the message annotations.
+     *
+     * @return the value of the annotation if it exists, or null if not set or not accessible.
+     */
+    public Object getDeliveryAnnotation(String key) {
+        if (deliveryAnnotationsMap == null) {
+            return null;
+        }
+
+        return deliveryAnnotationsMap.get(Symbol.valueOf(key));
+    }
+
+    /**
      * Sets a String value into the body of an outgoing Message, throws
      * an exception if this is an incoming message instance.
      *
@@ -347,6 +386,13 @@ public class AmqpMessage {
         }
     }
 
+    private void lazyCreateDeliveryAnnotations() {
+        if (deliveryAnnotationsMap == null) {
+            deliveryAnnotationsMap = new HashMap<Symbol,Object>();
+            message.setDeliveryAnnotations(new DeliveryAnnotations(deliveryAnnotationsMap));
+        }
+    }
+
     private void lazyCreateApplicationProperties() {
         if (applicationPropertiesMap == null) {
             applicationPropertiesMap = new HashMap<String, Object>();

http://git-wip-us.apache.org/repos/asf/activemq/blob/f05ff94e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDeliveryAnnotationsTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDeliveryAnnotationsTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDeliveryAnnotationsTest.java
new file mode 100644
index 0000000..bc992d0
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDeliveryAnnotationsTest.java
@@ -0,0 +1,96 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+
+/**
+ * Test around the handling of Deliver Annotations in messages sent and received.
+ */
+@RunWith(Parameterized.class)
+public class AmqpDeliveryAnnotationsTest extends AmqpClientTestSupport {
+
+    private final String DELIVERY_ANNOTATION_NAME = "TEST-DELIVERY-ANNOTATION";
+
+    private final String transformer;
+
+    @Parameters(name="{0}")
+    public static Collection<Object[]> data() {
+        return Arrays.asList(new Object[][] {
+            {"jms"},
+            // {"native"},
+            // {"raw"}  We cannot fix these now because proton has no way to selectively
+            //          prune the incoming message bytes from delivery annotations section
+            //          can be stripped from the message.
+        });
+    }
+
+    public AmqpDeliveryAnnotationsTest(String transformer) {
+        this.transformer = transformer;
+    }
+
+    @Override
+    protected String getAmqpTransformer() {
+        return transformer;
+    }
+
+    @Test(timeout = 60000)
+    public void testDeliveryAnnotationsStrippedFromIncoming() throws Exception {
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+        AmqpReceiver receiver = session.createReceiver("queue://" + getTestName());
+
+        AmqpMessage message = new AmqpMessage();
+
+        message.setText("Test-Message");
+        message.setDeliveryAnnotation(DELIVERY_ANNOTATION_NAME, getTestName());
+
+        sender.send(message);
+        receiver.flow(1);
+
+        QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(1, queue.getQueueSize());
+
+        AmqpMessage received = receiver.receive(); //5, TimeUnit.SECONDS);
+        assertNotNull(received);
+        assertNull(received.getDeliveryAnnotation(DELIVERY_ANNOTATION_NAME));
+
+        sender.close();
+        connection.close();
+    }
+}