You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2015/03/27 22:31:52 UTC
activemq git commit: https://issues.apache.org/jira/browse/AMQ-5684
Repository: activemq
Updated Branches:
refs/heads/master 05ff52dc1 -> f56ea45e5
https://issues.apache.org/jira/browse/AMQ-5684
Adds a new test case to use when investigating AmqpNetLite test failures
with the 'JMS' transformer used on the AMQP TransportConnector.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/f56ea45e
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/f56ea45e
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/f56ea45e
Branch: refs/heads/master
Commit: f56ea45e58a17fa3aad46cbe8fc605ef4ffdbc81
Parents: 05ff52d
Author: Timothy Bish <ta...@gmail.com>
Authored: Fri Mar 27 17:31:46 2015 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Fri Mar 27 17:31:46 2015 -0400
----------------------------------------------------------------------
.../transport/amqp/client/AmqpMessage.java | 166 +++++++++++++++++--
.../amqp/interop/AmqpSendReceiveTest.java | 81 +++++++++
2 files changed, 234 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/f56ea45e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
index 52e5eaf..9db12f9 100644
--- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpMessage.java
@@ -16,9 +16,16 @@
*/
package org.apache.activemq.transport.amqp.client;
+import java.util.HashMap;
+import java.util.Map;
+
import org.apache.activemq.transport.amqp.client.util.UnmodifiableDelivery;
import org.apache.qpid.proton.Proton;
+import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.ApplicationProperties;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
+import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.engine.Delivery;
import org.apache.qpid.proton.message.Message;
@@ -28,6 +35,9 @@ public class AmqpMessage {
private final Message message;
private final Delivery delivery;
+ private Map<Symbol, Object> messageAnnotationsMap;
+ private Map<String, Object> applicationPropertiesMap;
+
/**
* Creates a new AmqpMessage that wraps the information necessary to handle
* an outgoing message.
@@ -62,13 +72,51 @@ public class AmqpMessage {
* @param delivery
* the Delivery instance that produced this message.
*/
+ @SuppressWarnings("unchecked")
public AmqpMessage(AmqpReceiver receiver, Message message, Delivery delivery) {
this.receiver = receiver;
this.message = message;
this.delivery = delivery;
+
+ if (message.getMessageAnnotations() != null) {
+ messageAnnotationsMap = message.getMessageAnnotations().getValue();
+ }
+
+ if (message.getApplicationProperties() != null) {
+ applicationPropertiesMap = message.getApplicationProperties().getValue();
+ }
+ }
+
+ //----- Access to interal client resources -------------------------------//
+
+ /**
+ * @return the AMQP Delivery object linked to a received message.
+ */
+ public Delivery getWrappedDelivery() {
+ if (delivery != null) {
+ return new UnmodifiableDelivery(delivery);
+ }
+
+ return null;
}
/**
+ * @return the AMQP Message that is wrapped by this object.
+ */
+ public Message getWrappedMessage() {
+ return message;
+ }
+
+ /**
+ * @return the AmqpReceiver that consumed this message.
+ */
+ public AmqpReceiver getAmqpReceiver() {
+ return receiver;
+ }
+
+ //----- Message disposition control --------------------------------------//
+
+ /**
* Accepts the message marking it as consumed on the remote peer.
*
* @throws Exception if an error occurs during the accept.
@@ -134,29 +182,96 @@ public class AmqpMessage {
receiver.release(delivery);
}
+ //----- Convenience methods for constructing outbound messages -----------//
+
/**
- * @return the AMQP Delivery object linked to a received message.
+ * Sets the MessageId property on an outbound message using the provided String
+ *
+ * @param messageId
+ * the String message ID value to set.
*/
- public Delivery getWrappedDelivery() {
- if (delivery != null) {
- return new UnmodifiableDelivery(delivery);
+ public void setMessageId(String messageId) {
+ checkReadOnly();
+ lazyCreateProperties();
+ getWrappedMessage().setMessageId(messageId);
+ }
+
+ /**
+ * Return the set MessageId value in String form, if there are no properties
+ * in the given message return null.
+ *
+ * @return the set message ID in String form or null if not set.
+ */
+ public String getMessageId() {
+ if (message.getProperties() == null) {
+ return null;
}
- return null;
+ return message.getProperties().getMessageId().toString();
}
/**
- * @return the AMQP Message that is wrapped by this object.
+ * Sets a given application property on an outbound message.
+ *
+ * @param key
+ * the name to assign the new property.
+ * @param value
+ * the value to set for the named property.
*/
- public Message getWrappedMessage() {
- return message;
+ public void setApplicationProperty(String key, Object value) {
+ checkReadOnly();
+ lazyCreateApplicationProperties();
+ applicationPropertiesMap.put(key, value);
}
/**
- * @return the AmqpReceiver that consumed this message.
+ * Gets the application property that is mapped to the given name or null
+ * if no property has been set with that name.
+ *
+ * @param key
+ * the name used to lookup the property in the application properties.
+ *
+ * @return the propety value or null if not set.
*/
- public AmqpReceiver getAmqpReceiver() {
- return receiver;
+ public Object getApplicationProperty(String key) {
+ if (applicationPropertiesMap == null) {
+ return null;
+ }
+
+ return applicationPropertiesMap.get(key);
+ }
+
+ /**
+ * Perform a proper annotation set on the AMQP Message based on a Symbol key and
+ * the target value to append to the current annotations.
+ *
+ * @param key
+ * The name of the Symbol whose value is being set.
+ * @param value
+ * The new value to set in the annotations of this message.
+ */
+ public void setMessageAnnotation(String key, Object value) {
+ checkReadOnly();
+ lazyCreateMessageAnnotations();
+ messageAnnotationsMap.put(Symbol.valueOf(key), value);
+ }
+
+ /**
+ * Given a message annotation name, lookup and return the value associated with
+ * that annotation name. If the message annotations have not been created yet
+ * then this method will always return null.
+ *
+ * @param key
+ * the Symbol name that should be looked up in the message annotations.
+ *
+ * @return the value of the annotation if it exists, or null if not set or not accessible.
+ */
+ public Object getMessageAnnotation(String key) {
+ if (messageAnnotationsMap == null) {
+ return null;
+ }
+
+ return messageAnnotationsMap.get(Symbol.valueOf(key));
}
/**
@@ -169,11 +284,36 @@ public class AmqpMessage {
* @throws IllegalStateException if the message is read only.
*/
public void setText(String value) throws IllegalStateException {
+ checkReadOnly();
+ AmqpValue body = new AmqpValue(value);
+ getWrappedMessage().setBody(body);
+ }
+
+ //----- Internal implementation ------------------------------------------//
+
+ private void checkReadOnly() throws IllegalStateException {
if (delivery != null) {
throw new IllegalStateException("Message is read only.");
}
+ }
- AmqpValue body = new AmqpValue(value);
- getWrappedMessage().setBody(body);
+ private void lazyCreateMessageAnnotations() {
+ if (messageAnnotationsMap == null) {
+ messageAnnotationsMap = new HashMap<Symbol,Object>();
+ message.setMessageAnnotations(new MessageAnnotations(messageAnnotationsMap));
+ }
+ }
+
+ private void lazyCreateApplicationProperties() {
+ if (applicationPropertiesMap == null) {
+ applicationPropertiesMap = new HashMap<String, Object>();
+ message.setApplicationProperties(new ApplicationProperties(applicationPropertiesMap));
+ }
+ }
+
+ private void lazyCreateProperties() {
+ if (message.getProperties() == null) {
+ message.setProperties(new Properties());
+ }
}
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/f56ea45e/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
----------------------------------------------------------------------
diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
new file mode 100644
index 0000000..b16ef59
--- /dev/null
+++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpSendReceiveTest.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.transport.amqp.interop;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.broker.jmx.QueueViewMBean;
+import org.apache.activemq.transport.amqp.client.AmqpClient;
+import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport;
+import org.apache.activemq.transport.amqp.client.AmqpConnection;
+import org.apache.activemq.transport.amqp.client.AmqpMessage;
+import org.apache.activemq.transport.amqp.client.AmqpReceiver;
+import org.apache.activemq.transport.amqp.client.AmqpSender;
+import org.apache.activemq.transport.amqp.client.AmqpSession;
+import org.junit.Ignore;
+import org.junit.Test;
+
+/**
+ * Test basic send and receive scenarios using only AMQP sender and receiver links.
+ */
+public class AmqpSendReceiveTest extends AmqpClientTestSupport {
+
+ @Ignore("Test fails when JMS transformer is in play")
+ @Test(timeout = 60000)
+ public void testCloseBusyReceiver() throws Exception {
+ final int MSG_COUNT = 20;
+
+ AmqpClient client = createAmqpClient();
+ AmqpConnection connection = client.connect();
+ AmqpSession session = connection.createSession();
+
+ AmqpSender sender = session.createSender("queue://" + getTestName());
+
+ for (int i = 0; i < MSG_COUNT; i++) {
+ AmqpMessage message = new AmqpMessage();
+
+ message.setMessageId("msg" + i);
+ message.setMessageAnnotation("serialNo", i);
+ message.setText("Test-Message");
+
+ sender.send(message);
+ }
+
+ sender.close();
+
+ QueueViewMBean queue = getProxyToQueue(getTestName());
+ assertEquals(20, queue.getQueueSize());
+
+ AmqpReceiver receiver1 = session.createReceiver("queue://" + getTestName());
+ receiver1.flow(MSG_COUNT);
+ AmqpMessage received = receiver1.receive(5, TimeUnit.SECONDS);
+ assertEquals("msg0", received.getMessageId());
+ receiver1.close();
+
+ AmqpReceiver receiver2 = session.createReceiver("queue://" + getTestName());
+ receiver2.flow(200);
+ for (int i = 0; i < MSG_COUNT; ++i) {
+ received = receiver1.receive(5, TimeUnit.SECONDS);
+ assertEquals("msg" + i, received.getMessageId());
+ }
+
+ receiver2.close();
+ connection.close();
+ }
+}