You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/27 22:31:52 UTC

activemq git commit: https://issues.apache.org/jira/browse/AMQ-5684

Repository: activemq
Updated Branches:
  refs/heads/master 05ff52dc1 -> f56ea45e5


https://issues.apache.org/jira/browse/AMQ-5684

Adds a new test case to use when investigating AmqpNetLite test failures
with the 'JMS' transformer used on the AMQP TransportConnector.

Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f56ea45e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f56ea45e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f56ea45e

Branch: refs/heads/master
Commit: f56ea45e58a17fa3aad46cbe8fc605ef4ffdbc81
Parents: 05ff52d
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Mar 27 17:31:46 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Mar 27 17:31:46 2015 -0400

----------------------------------------------------------------------
 .../transport/amqp/client/AmqpMessage.java      | 166 +++++++++++++++++--
 .../amqp/interop/AmqpSendReceiveTest.java       |  81 +++++++++
 2 files changed, 234 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/f56ea45e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 52e5eaf..9db12f9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -16,9 +16,16 @@
  */
 package org.apache.activemq.transport.amqp.client;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
 import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.message.Message;
 
@@ -28,6 +35,9 @@ public class AmqpMessage {
     private final Message message;
     private final Delivery delivery;
 
+    private Map<Symbol, Object> messageAnnotationsMap;
+    private Map<String, Object> applicationPropertiesMap;
+
     /**
      * Creates a new AmqpMessage that wraps the information necessary to handle
      * an outgoing message.
@@ -62,13 +72,51 @@ public class AmqpMessage {
      * @param delivery
      *        the Delivery instance that produced this message.
      */
+    @SuppressWarnings("unchecked")
     public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) {
         this.receiver = receiver;
         this.message = message;
         this.delivery = delivery;
+
+        if (message.getMessageAnnotations() != null) {
+            messageAnnotationsMap = message.getMessageAnnotations().getValue();
+        }
+
+        if (message.getApplicationProperties() != null) {
+            applicationPropertiesMap = message.getApplicationProperties().getValue();
+        }
+    }
+
+    //----- Access to interal client resources -------------------------------//
+
+    /**
+     * @return the AMQP Delivery object linked to a received message.
+     */
+    public Delivery getWrappedDelivery() {
+        if (delivery != null) {
+            return new UnmodifiableDelivery(delivery);
+        }
+
+        return null;
     }
 
     /**
+     * @return the AMQP Message that is wrapped by this object.
+     */
+    public Message getWrappedMessage() {
+        return message;
+    }
+
+    /**
+     * @return the AmqpReceiver that consumed this message.
+     */
+    public AmqpReceiver getAmqpReceiver() {
+        return receiver;
+    }
+
+    //----- Message disposition control --------------------------------------//
+
+    /**
      * Accepts the message marking it as consumed on the remote peer.
      *
      * @throws Exception if an error occurs during the accept.
@@ -134,29 +182,96 @@ public class AmqpMessage {
         receiver.release(delivery);
     }
 
+    //----- Convenience methods for constructing outbound messages -----------//
+
     /**
-     * @return the AMQP Delivery object linked to a received message.
+     * Sets the MessageId property on an outbound message using the provided String
+     *
+     * @param messageId
+     *        the String message ID value to set.
      */
-    public Delivery getWrappedDelivery() {
-        if (delivery != null) {
-            return new UnmodifiableDelivery(delivery);
+    public void setMessageId(String messageId) {
+        checkReadOnly();
+        lazyCreateProperties();
+        getWrappedMessage().setMessageId(messageId);
+    }
+
+    /**
+     * Return the set MessageId value in String form, if there are no properties
+     * in the given message return null.
+     *
+     * @return the set message ID in String form or null if not set.
+     */
+    public String getMessageId() {
+        if (message.getProperties() == null) {
+            return null;
         }
 
-        return null;
+        return message.getProperties().getMessageId().toString();
     }
 
     /**
-     * @return the AMQP Message that is wrapped by this object.
+     * Sets a given application property on an outbound message.
+     *
+     * @param key
+     *        the name to assign the new property.
+     * @param value
+     *        the value to set for the named property.
      */
-    public Message getWrappedMessage() {
-        return message;
+    public void setApplicationProperty(String key, Object value) {
+        checkReadOnly();
+        lazyCreateApplicationProperties();
+        applicationPropertiesMap.put(key, value);
     }
 
     /**
-     * @return the AmqpReceiver that consumed this message.
+     * Gets the application property that is mapped to the given name or null
+     * if no property has been set with that name.
+     *
+     * @param key
+     *        the name used to lookup the property in the application properties.
+     *
+     * @return the propety value or null if not set.
      */
-    public AmqpReceiver getAmqpReceiver() {
-        return receiver;
+    public Object getApplicationProperty(String key) {
+        if (applicationPropertiesMap == null) {
+            return null;
+        }
+
+        return applicationPropertiesMap.get(key);
+    }
+
+    /**
+     * Perform a proper annotation set on the AMQP Message based on a Symbol key and
+     * the target value to append to the current annotations.
+     *
+     * @param key
+     *        The name of the Symbol whose value is being set.
+     * @param value
+     *        The new value to set in the annotations of this message.
+     */
+    public void setMessageAnnotation(String key, Object value) {
+        checkReadOnly();
+        lazyCreateMessageAnnotations();
+        messageAnnotationsMap.put(Symbol.valueOf(key), value);
+    }
+
+    /**
+     * Given a message annotation name, lookup and return the value associated with
+     * that annotation name.  If the message annotations have not been created yet
+     * then this method will always return null.
+     *
+     * @param key
+     *        the Symbol name that should be looked up in the message annotations.
+     *
+     * @return the value of the annotation if it exists, or null if not set or not accessible.
+     */
+    public Object getMessageAnnotation(String key) {
+        if (messageAnnotationsMap == null) {
+            return null;
+        }
+
+        return messageAnnotationsMap.get(Symbol.valueOf(key));
     }
 
     /**
@@ -169,11 +284,36 @@ public class AmqpMessage {
      * @throws IllegalStateException if the message is read only.
      */
     public void setText(String value) throws IllegalStateException {
+        checkReadOnly();
+        AmqpValue body = new AmqpValue(value);
+        getWrappedMessage().setBody(body);
+    }
+
+    //----- Internal implementation ------------------------------------------//
+
+    private void checkReadOnly() throws IllegalStateException {
         if (delivery != null) {
             throw new IllegalStateException("Message is read only.");
         }
+    }
 
-        AmqpValue body = new AmqpValue(value);
-        getWrappedMessage().setBody(body);
+    private void lazyCreateMessageAnnotations() {
+        if (messageAnnotationsMap == null) {
+            messageAnnotationsMap = new HashMap<Symbol,Object>();
+            message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
+        }
+    }
+
+    private void lazyCreateApplicationProperties() {
+        if (applicationPropertiesMap == null) {
+            applicationPropertiesMap = new HashMap<String, Object>();
+            message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
+        }
+    }
+
+    private void lazyCreateProperties() {
+        if (message.getProperties() == null) {
+            message.setProperties(new Properties());
+        }
     }
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/f56ea45e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
new file mode 100644
index 0000000..b16ef59
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Test basic send and receive scenarios using only AMQP sender and receiver links.
+ */
+public class AmqpSendReceiveTest extends AmqpClientTestSupport {
+
+    @Ignore("Test fails when JMS transformer is in play")
+    @Test(timeout = 60000)
+    public void testCloseBusyReceiver() throws Exception {
+        final int MSG_COUNT = 20;
+
+        AmqpClient client = createAmqpClient();
+        AmqpConnection connection = client.connect();
+        AmqpSession session = connection.createSession();
+
+        AmqpSender sender = session.createSender("queue://" + getTestName());
+
+        for (int i = 0; i < MSG_COUNT; i++) {
+            AmqpMessage message = new AmqpMessage();
+
+            message.setMessageId("msg" + i);
+            message.setMessageAnnotation("serialNo", i);
+            message.setText("Test-Message");
+
+            sender.send(message);
+        }
+
+        sender.close();
+
+        QueueViewMBean queue = getProxyToQueue(getTestName());
+        assertEquals(20, queue.getQueueSize());
+
+        AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
+        receiver1.flow(MSG_COUNT);
+        AmqpMessage received = receiver1.receive(5, TimeUnit.SECONDS);
+        assertEquals("msg0", received.getMessageId());
+        receiver1.close();
+
+        AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName());
+        receiver2.flow(200);
+        for (int i = 0; i < MSG_COUNT; ++i) {
+            received = receiver1.receive(5, TimeUnit.SECONDS);
+            assertEquals("msg" + i, received.getMessageId());
+        }
+
+        receiver2.close();
+        connection.close();
+    }
+}