You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/24 22:17:56 UTC
[1/4] git commit: Add destination accessor
Repository: qpid-jms
Updated Branches:
refs/heads/master 5c483dbcb -> bdc47a55c
Add destination accessor
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/8efad7c6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/8efad7c6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/8efad7c6
Branch: refs/heads/master
Commit: 8efad7c651892fef1cd14a910ec2457980722eac
Parents: 5c483db
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 24 14:19:15 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 24 14:19:43 2014 -0400
----------------------------------------------------------------------
.../java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8efad7c6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 3ecb58e..28b989c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -30,8 +30,8 @@ import org.apache.qpid.jms.meta.JmsConsumerId;
import org.apache.qpid.jms.meta.JmsConsumerInfo;
import org.apache.qpid.jms.meta.JmsMessageId;
import org.apache.qpid.jms.provider.AsyncResult;
-import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderListener;
import org.apache.qpid.jms.util.IOExceptionSupport;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.DescribedType;
@@ -363,6 +363,10 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
return this.info.getConsumerId();
}
+ public JmsDestination getDestination() {
+ return this.info.getDestination();
+ }
+
public Receiver getProtonReceiver() {
return this.endpoint;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[3/4] git commit: Add some notes
Posted by ta...@apache.org.
Add some notes
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/459c90a8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/459c90a8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/459c90a8
Branch: refs/heads/master
Commit: 459c90a89aae32a4d9ffade62157fbd7e03e6568
Parents: eae4583
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 24 16:12:40 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 24 16:12:40 2014 -0400
----------------------------------------------------------------------
.../jms/provider/amqp/message/AmqpDestinationHelper.java | 10 ++++++++++
1 file changed, 10 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/459c90a8/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
index 9d7445a..350aeb4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
@@ -48,6 +48,16 @@ public class AmqpDestinationHelper {
// TODO - The Type Annotation seems like it could just be a byte value
// TODO - How do we deal with the case where no type is present?
+ /*
+ * One possible way to encode destination types that isn't a string.
+ *
+ * public static final byte QUEUE_TYPE = 0x01;
+ * public static final byte TOPIC_TYPE = 0x02;
+ * public static final byte TEMP_MASK = 0x04;
+ * public static final byte TEMP_TOPIC_TYPE = TOPIC_TYPE | TEMP_MASK;
+ * public static final byte TEMP_QUEUE_TYPE = QUEUE_TYPE | TEMP_MASK;
+ */
+
/**
* Decode the provided To address, type description, and consumer destination
* information such that an appropriate Destination object can be returned.
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[2/4] git commit: Implement get / set of JMS Destination from the
AMQP message facade
Posted by ta...@apache.org.
Implement get / set of JMS Destination from the AMQP message facade
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/eae45830
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/eae45830
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/eae45830
Branch: refs/heads/master
Commit: eae45830c04e88823f6d3e38f1cc8ece891ae664
Parents: 8efad7c
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 24 16:06:58 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 24 16:06:58 2014 -0400
----------------------------------------------------------------------
.../qpid/jms/provider/amqp/AmqpConsumer.java | 5 +
.../amqp/message/AmqpDestinationHelper.java | 200 +++++++++++++++++++
.../amqp/message/AmqpJmsBytesMessageFacade.java | 10 +-
.../amqp/message/AmqpJmsMapMessageFacade.java | 11 +-
.../amqp/message/AmqpJmsMessageBuilder.java | 37 ++--
.../amqp/message/AmqpJmsMessageFacade.java | 38 ++--
.../message/AmqpJmsObjectMessageFacade.java | 15 +-
.../message/AmqpJmsStreamMessageFacade.java | 11 +-
.../amqp/message/AmqpJmsTextMessageFacade.java | 11 +-
.../amqp/message/AmqpMessageSupport.java | 24 +++
.../amqp/message/AmqpJmsMessageFacadeTest.java | 22 +-
11 files changed, 315 insertions(+), 69 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 28b989c..1f51b46 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -355,6 +355,11 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
protected void doClose() {
}
+
+ public AmqpConnection getConnection() {
+ return this.session.getConnection();
+ }
+
public AmqpSession getSession() {
return this.session;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
new file mode 100644
index 0000000..9d7445a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsQueue;
+import org.apache.qpid.jms.JmsTemporaryQueue;
+import org.apache.qpid.jms.JmsTemporaryTopic;
+import org.apache.qpid.jms.JmsTopic;
+
+/**
+ * A set of static utility method useful when mapping JmsDestination types to / from the AMQP
+ * destination fields in a Message that's being sent or received.
+ */
+public class AmqpDestinationHelper {
+
+ public static final AmqpDestinationHelper INSTANCE = new AmqpDestinationHelper();
+
+ public static final String TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-to-type";
+ public static final String REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-reply-type";
+
+ static final String QUEUE_ATTRIBUTE = "queue";
+ static final String TOPIC_ATTRIBUTE = "topic";
+ static final String TEMPORARY_ATTRIBUTE = "temporary";
+
+ public static final String QUEUE_ATTRIBUTES_STRING = QUEUE_ATTRIBUTE;
+ public static final String TOPIC_ATTRIBUTES_STRING = TOPIC_ATTRIBUTE;
+ public static final String TEMP_QUEUE_ATTRIBUTES_STRING = QUEUE_ATTRIBUTE + "," + TEMPORARY_ATTRIBUTE;
+ public static final String TEMP_TOPIC_ATTRIBUTES_STRING = TOPIC_ATTRIBUTE + "," + TEMPORARY_ATTRIBUTE;
+
+ // TODO - The Type Annotation seems like it could just be a byte value
+ // TODO - How do we deal with the case where no type is present?
+
+ /**
+ * Decode the provided To address, type description, and consumer destination
+ * information such that an appropriate Destination object can be returned.
+ *
+ * If an address and type description is provided then this will be used to
+ * create the Destination. If the type information is missing, it will be
+ * derived from the consumer destination if present, or default to a generic
+ * destination if not.
+ *
+ * If the address is null then the consumer destination is returned, unless
+ * the useConsumerDestForTypeOnly flag is true, in which case null will be
+ * returned.
+ */
+
+ public JmsDestination buildJmsDestination(AmqpJmsMessageFacade message, JmsDestination consumerDestination) {
+ String to = message.getAmqpMessage().getAddress();
+ String toTypeString = (String) message.getAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+ Set<String> typeSet = null;
+
+ if (toTypeString != null) {
+ typeSet = splitAttributes(toTypeString);
+ }
+
+ return createDestination(to, typeSet, consumerDestination, false);
+ }
+
+ public JmsDestination buildJmsReplyTo(AmqpJmsMessageFacade message, JmsDestination consumerDestination) {
+ String replyTo = message.getAmqpMessage().getReplyTo();
+ String replyToTypeString = (String) message.getAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+ Set<String> typeSet = null;
+
+ if (replyToTypeString != null) {
+ typeSet = splitAttributes(replyToTypeString);
+ }
+
+ return createDestination(replyTo, typeSet, consumerDestination, true);
+ }
+
+ private JmsDestination createDestination(String address, Set<String> typeSet, JmsDestination consumerDestination, boolean useConsumerDestForTypeOnly) {
+ if (address == null) {
+ return useConsumerDestForTypeOnly ? null : consumerDestination;
+ }
+
+ if (typeSet != null && !typeSet.isEmpty()) {
+ if (typeSet.contains(QUEUE_ATTRIBUTE)) {
+ if (typeSet.contains(TEMPORARY_ATTRIBUTE)) {
+ return new JmsTemporaryQueue(address);
+ } else {
+ return new JmsQueue(address);
+ }
+ } else if (typeSet.contains(TOPIC_ATTRIBUTE)) {
+ if (typeSet.contains(TEMPORARY_ATTRIBUTE)) {
+ return new JmsTemporaryTopic(address);
+ } else {
+ return new JmsTopic(address);
+ }
+ }
+ }
+
+ if (consumerDestination.isQueue()) {
+ if (consumerDestination.isTemporary()) {
+ return new JmsTemporaryQueue(address);
+ } else {
+ return new JmsQueue(address);
+ }
+ } else if (consumerDestination.isTopic()) {
+ if (consumerDestination.isTemporary()) {
+ return new JmsTemporaryTopic(address);
+ } else {
+ return new JmsTopic(address);
+ }
+ }
+
+ // fall back to a straight Destination
+ // TODO - We don't have a non-abstract destination to create right now
+ // and JMS doesn't really define a true non Topic / Queue destination
+ // so how this would be handled elsewhere seems a mystery.
+ return null;
+ }
+
+ public void setToAddressFromDestination(AmqpJmsMessageFacade message, JmsDestination destination) {
+ String address = destination.getName();
+ String typeString = toTypeAnnotation(destination);
+
+ message.getAmqpMessage().setAddress(address);
+
+ if (address == null || typeString == null) {
+ message.removeAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+ } else {
+ message.setAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, typeString);
+ }
+ }
+
+ public void setReplyToAddressFromDestination(AmqpJmsMessageFacade message, JmsDestination destination) {
+ String replyToAddress = destination.getName();
+ String typeString = toTypeAnnotation(destination);
+
+ message.getAmqpMessage().setReplyTo(replyToAddress);
+
+ if (replyToAddress == null || typeString == null) {
+ message.removeAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+ } else {
+ message.setAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, typeString);
+ }
+ }
+
+ /**
+ * @return the annotation type string, or null if the supplied destination
+ * is null or can't be classified
+ */
+ private String toTypeAnnotation(JmsDestination destination) {
+ if (destination == null) {
+ return null;
+ }
+
+ if (destination.isQueue()) {
+ if (destination.isTemporary()) {
+ return TEMP_QUEUE_ATTRIBUTES_STRING;
+ } else {
+ return QUEUE_ATTRIBUTES_STRING;
+ }
+ } else if (destination.isTopic()) {
+ if (destination.isTemporary()) {
+ return TEMP_TOPIC_ATTRIBUTES_STRING;
+ } else {
+ return TOPIC_ATTRIBUTES_STRING;
+ }
+ }
+
+ return null;
+ }
+
+ public Set<String> splitAttributes(String typeString) {
+ if (typeString == null) {
+ return null;
+ }
+
+ HashSet<String> typeSet = new HashSet<String>();
+
+ // Split string on commas and their surrounding whitespace
+ for (String attr : typeString.split("\\s*,\\s*")) {
+ // ignore empty values
+ if (!attr.equals("")) {
+ typeSet.add(attr);
+ }
+ }
+
+ return typeSet;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
index 1cefc2a..664d414 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
@@ -21,6 +21,7 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_M
import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
@@ -42,6 +43,7 @@ public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements J
* Creates a new facade instance
*
* @param connection
+ * the AmqpConnection that under which this facade was created.
*/
public AmqpJmsBytesMessageFacade(AmqpConnection connection) {
super(connection);
@@ -52,13 +54,13 @@ public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements J
* Creates a new Facade around an incoming AMQP Message for dispatch to the
* JMS Consumer instance.
*
- * @param connection
- * the connection that created this Facade.
+ * @param consumer
+ * the consumer that received this message.
* @param message
* the incoming Message instance that is being wrapped.
*/
- public AmqpJmsBytesMessageFacade(AmqpConnection connection, Message message) {
- super(connection, message);
+ public AmqpJmsBytesMessageFacade(AmqpConsumer consumer, Message message) {
+ super(consumer, message);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
index 511d1e3..fdda9e4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
@@ -27,6 +27,7 @@ import java.util.Map;
import org.apache.qpid.jms.message.facade.JmsMapMessageFacade;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Section;
@@ -44,7 +45,7 @@ public class AmqpJmsMapMessageFacade extends AmqpJmsMessageFacade implements Jms
* Create a new facade ready for sending.
*
* @param connection
- * the connection instance that created this facade.
+ * the AmqpConnection that under which this facade was created.
*/
public AmqpJmsMapMessageFacade(AmqpConnection connection) {
super(connection);
@@ -56,14 +57,14 @@ public class AmqpJmsMapMessageFacade extends AmqpJmsMessageFacade implements Jms
* Creates a new Facade around an incoming AMQP Message for dispatch to the
* JMS Consumer instance.
*
- * @param connection
- * the connection that created this Facade.
+ * @param consumer
+ * the consumer that received this message.
* @param message
* the incoming Message instance that is being wrapped.
*/
@SuppressWarnings("unchecked")
- public AmqpJmsMapMessageFacade(AmqpConnection connection, Message message) {
- super(connection, message);
+ public AmqpJmsMapMessageFacade(AmqpConsumer consumer, Message message) {
+ super(consumer, message);
Section body = getAmqpMessage().getBody();
if (body == null) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
index c31c50b..a09cee2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
@@ -25,7 +25,6 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_S
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_TEXT_MESSAGE;
import java.io.IOException;
-import java.util.Map;
import org.apache.qpid.jms.message.JmsBytesMessage;
import org.apache.qpid.jms.message.JmsMapMessage;
@@ -33,8 +32,7 @@ import org.apache.qpid.jms.message.JmsMessage;
import org.apache.qpid.jms.message.JmsObjectMessage;
import org.apache.qpid.jms.message.JmsStreamMessage;
import org.apache.qpid.jms.message.JmsTextMessage;
-import org.apache.qpid.jms.provider.amqp.AmqpConnection;
-import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.proton.message.Message;
/**
@@ -50,8 +48,8 @@ public class AmqpJmsMessageBuilder {
* Create a new JmsMessage and underlying JmsMessageFacade that represents the proper
* message type for the incoming AMQP message.
*
- * @param connection
- * The provider AMQP Connection instance where this message arrived at.
+ * @param consumer
+ * The provider AMQP Consumer instance where this message arrived at.
* @param message
* The Proton Message object that will be wrapped.
*
@@ -59,10 +57,10 @@ public class AmqpJmsMessageBuilder {
*
* @throws IOException if an error occurs while creating the message objects.
*/
- public static JmsMessage createJmsMessage(AmqpConnection connection, Message message) throws IOException {
+ public static JmsMessage createJmsMessage(AmqpConsumer consumer, Message message) throws IOException {
// First we try the easy way, if the annotation is there we don't have to work hard.
- JmsMessage result = createFromMsgAnnotation(connection, message);
+ JmsMessage result = createFromMsgAnnotation(consumer, message);
if (result != null) {
return result;
}
@@ -71,23 +69,23 @@ public class AmqpJmsMessageBuilder {
throw new IOException("Could not create a JMS message from incoming message");
}
- private static JmsMessage createFromMsgAnnotation(AmqpConnection connection, Message message) throws IOException {
- Object annotation = getMessageAnnotation(JMS_MSG_TYPE, message);
+ private static JmsMessage createFromMsgAnnotation(AmqpConsumer consumer, Message message) throws IOException {
+ Object annotation = AmqpMessageSupport.getMessageAnnotation(JMS_MSG_TYPE, message);
if (annotation != null) {
switch ((byte) annotation) {
case JMS_MESSAGE:
- return new JmsMessage(new AmqpJmsMessageFacade(connection, message));
+ return new JmsMessage(new AmqpJmsMessageFacade(consumer, message));
case JMS_BYTES_MESSAGE:
- return new JmsBytesMessage(new AmqpJmsBytesMessageFacade(connection, message));
+ return new JmsBytesMessage(new AmqpJmsBytesMessageFacade(consumer, message));
case JMS_TEXT_MESSAGE:
- return new JmsTextMessage(new AmqpJmsTextMessageFacade(connection, message));
+ return new JmsTextMessage(new AmqpJmsTextMessageFacade(consumer, message));
case JMS_MAP_MESSAGE:
- return new JmsMapMessage(new AmqpJmsMapMessageFacade(connection, message));
+ return new JmsMapMessage(new AmqpJmsMapMessageFacade(consumer, message));
case JMS_STREAM_MESSAGE:
- return new JmsStreamMessage(new AmqpJmsStreamMessageFacade(connection, message));
+ return new JmsStreamMessage(new AmqpJmsStreamMessageFacade(consumer, message));
case JMS_OBJECT_MESSAGE:
- return new JmsObjectMessage(new AmqpJmsObjectMessageFacade(connection, message));
+ return new JmsObjectMessage(new AmqpJmsObjectMessageFacade(consumer, message));
default:
throw new IOException("Invalid JMS Message Type annotation found in message");
}
@@ -95,13 +93,4 @@ public class AmqpJmsMessageBuilder {
return null;
}
-
- private static Object getMessageAnnotation(String key, Message message) {
- if (message.getMessageAnnotations() != null) {
- Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
- return annotations.get(AmqpMessageSupport.getSymbol(key));
- }
-
- return null;
- }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
index 4937000..acd3012 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
@@ -37,6 +37,7 @@ import org.apache.qpid.jms.exceptions.IdConversionException;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
import org.apache.qpid.jms.meta.JmsMessageId;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
@@ -74,6 +75,9 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
/**
* Create a new AMQP Message Facade with an empty message instance.
+ *
+ * @param connection
+ * the AmqpConnection that under which this facade was created.
*/
public AmqpJmsMessageFacade(AmqpConnection connection) {
this.message = Proton.message();
@@ -87,15 +91,15 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
* Creates a new Facade around an incoming AMQP Message for dispatch to the
* JMS Consumer instance.
*
- * @param connection
- * the connection that created this Facade.
+ * @param consumer
+ * the consumer that received this message.
* @param message
* the incoming Message instance that is being wrapped.
*/
@SuppressWarnings("unchecked")
- public AmqpJmsMessageFacade(AmqpConnection connection, Message message) {
+ public AmqpJmsMessageFacade(AmqpConsumer consumer, Message message) {
this.message = message;
- this.connection = connection;
+ this.connection = consumer.getConnection();
annotations = message.getMessageAnnotations();
if (annotations != null) {
@@ -112,8 +116,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
syntheticTTL = System.currentTimeMillis() + ttl;
}
- // TODO - Set destination
- // TODO - Set replyTo
+ this.destination = AmqpDestinationHelper.INSTANCE.buildJmsDestination(this, consumer.getDestination());
+ this.replyTo = AmqpDestinationHelper.INSTANCE.buildJmsReplyTo(this, consumer.getDestination());
}
/**
@@ -223,7 +227,6 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
@Override
public void clearProperties() {
- clearProperties();
//_propJMS_AMQP_TTL = null;
message.setReplyToGroupId(null);
message.setUserId(null);
@@ -235,7 +238,7 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
@Override
public JmsMessageFacade copy() throws JMSException {
- AmqpJmsMessageFacade copy = new AmqpJmsMessageFacade(connection, message);
+ AmqpJmsMessageFacade copy = new AmqpJmsMessageFacade(connection);
copyInto(copy);
return copy;
}
@@ -250,12 +253,12 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
AmqpMessageIdHelper helper = AmqpMessageIdHelper.INSTANCE;
String baseStringId = helper.toBaseMessageIdString(underlying);
- //Ensure the ID: prefix is present.
- //TODO: should we always do this? AMQP JMS Mapping says never to send the "ID:" prefix.
- //TODO: should we make this part of the JmsMessageId, or JmsMessage object responsibilities?
- // I Ended up putting it in JmsMessage after the above comment, as a workaround for the current JmsDefaultMessageFacade usage.
- if(baseStringId != null && !helper.hasMessageIdPrefix(baseStringId))
- {
+ // Ensure the ID: prefix is present.
+ // TODO: should we always do this? AMQP JMS Mapping says never to send the "ID:" prefix.
+ // TODO: should we make this part of the JmsMessageId, or JmsMessage object responsibilities?
+ // I Ended up putting it in JmsMessage after the above comment, as a workaround for the
+ // current JmsDefaultMessageFacade usage.
+ if (baseStringId != null && !helper.hasMessageIdPrefix(baseStringId)) {
baseStringId = AmqpMessageIdHelper.JMS_ID_PREFIX + baseStringId;
}
return new JmsMessageId(baseStringId);
@@ -514,8 +517,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
@Override
public void setDestination(JmsDestination destination) {
this.destination = destination;
-
- // TODO
+ lazyCreateAnnotations();
+ AmqpDestinationHelper.INSTANCE.setToAddressFromDestination(this, destination);
}
@Override
@@ -526,7 +529,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
@Override
public void setReplyTo(JmsDestination replyTo) {
this.replyTo = replyTo;
- // TODO Auto-generated method stub
+ lazyCreateAnnotations();
+ AmqpDestinationHelper.INSTANCE.setReplyToAddressFromDestination(this, replyTo);
}
public void setReplyToGroupId(String replyToGroupId) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
index 3696653..1668a6c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
@@ -27,6 +27,7 @@ import javax.jms.JMSException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.proton.message.Message;
/**
@@ -38,7 +39,10 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
private AmqpObjectTypeDelegate delegate;
/**
+ * Creates a new facade instance
+ *
* @param connection
+ * the AmqpConnection that under which this facade was created.
*/
public AmqpJmsObjectMessageFacade(AmqpConnection connection) {
super(connection);
@@ -49,11 +53,16 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
}
/**
- * @param connection
+ * Creates a new Facade around an incoming AMQP Message for dispatch to the
+ * JMS Consumer instance.
+ *
+ * @param consumer
+ * the consumer that received this message.
* @param message
+ * the incoming Message instance that is being wrapped.
*/
- public AmqpJmsObjectMessageFacade(AmqpConnection connection, Message message) {
- super(connection, message);
+ public AmqpJmsObjectMessageFacade(AmqpConsumer consumer, Message message) {
+ super(consumer, message);
// TODO detect the content type and init the proper delegate.
initDelegate(false);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
index 0999225..90cd97e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
@@ -27,6 +27,7 @@ import javax.jms.MessageEOFException;
import org.apache.qpid.jms.message.facade.JmsStreamMessageFacade;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Section;
@@ -45,7 +46,7 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements
* Create a new facade ready for sending.
*
* @param connection
- * the connection instance that created this facade.
+ * the AmqpConnection that under which this facade was created.
*/
public AmqpJmsStreamMessageFacade(AmqpConnection connection) {
super(connection);
@@ -57,14 +58,14 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements
* Creates a new Facade around an incoming AMQP Message for dispatch to the
* JMS Consumer instance.
*
- * @param connection
- * the connection that created this Facade.
+ * @param consumer
+ * the consumer that received this message.
* @param message
* the incoming Message instance that is being wrapped.
*/
@SuppressWarnings("unchecked")
- public AmqpJmsStreamMessageFacade(AmqpConnection connection, Message message) {
- super(connection, message);
+ public AmqpJmsStreamMessageFacade(AmqpConsumer consumer, Message message) {
+ super(consumer, message);
Section body = getAmqpMessage().getBody();
if (body == null) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
index 6c2421b..c2d6fc2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
@@ -30,6 +30,7 @@ import javax.jms.JMSException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.facade.JmsTextMessageFacade;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.messaging.AmqpValue;
import org.apache.qpid.proton.amqp.messaging.Data;
@@ -56,7 +57,7 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
* Create a new AMQP Message facade ready for sending.
*
* @param connection
- * The AMQP Connection that created this message.
+ * the AmqpConnection that under which this facade was created.
*/
public AmqpJmsTextMessageFacade(AmqpConnection connection) {
super(connection);
@@ -68,13 +69,13 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
* Creates a new Facade around an incoming AMQP Message for dispatch to the
* JMS Consumer instance.
*
- * @param connection
- * the connection that created this Facade.
+ * @param consumer
+ * the consumer that received this message.
* @param message
* the incoming Message instance that is being wrapped.
*/
- public AmqpJmsTextMessageFacade(AmqpConnection connection, Message message) {
- super(connection, message);
+ public AmqpJmsTextMessageFacade(AmqpConsumer consumer, Message message) {
+ super(consumer, message);
}
/**
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
index a01d415..563352b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
@@ -16,6 +16,8 @@
*/
package org.apache.qpid.jms.provider.amqp.message;
+import java.util.Map;
+
import javax.jms.Destination;
import javax.jms.Queue;
import javax.jms.TemporaryQueue;
@@ -23,6 +25,7 @@ import javax.jms.TemporaryTopic;
import javax.jms.Topic;
import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.message.Message;
/**
* Support class containing constant values and static methods that are
@@ -166,4 +169,25 @@ public final class AmqpMessageSupport {
return null;
}
+
+ /**
+ * Safe way to access message annotations which will check internal structure and
+ * either return the annotation if it exists or null if the annotation or any annotations
+ * are present.
+ *
+ * @param key
+ * the String key to use to lookup an annotation.
+ * @param message
+ * the AMQP message object that is being examined.
+ *
+ * @return the given annotation value or null if not present in the message.
+ */
+ public static Object getMessageAnnotation(String key, Message message) {
+ if (message != null && message.getMessageAnnotations() != null) {
+ Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
+ return annotations.get(AmqpMessageSupport.getSymbol(key));
+ }
+
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
index a276c7f..af68cf3 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.jms.provider.amqp.message;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.HashMap;
@@ -29,6 +32,7 @@ import java.util.UUID;
import org.apache.qpid.jms.meta.JmsMessageId;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Binary;
import org.apache.qpid.proton.amqp.Symbol;
@@ -36,7 +40,6 @@ import org.apache.qpid.proton.amqp.UnsignedLong;
import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
import org.apache.qpid.proton.amqp.messaging.Properties;
import org.apache.qpid.proton.message.Message;
-import org.junit.Ignore;
import org.junit.Test;
import org.mockito.Mockito;
@@ -46,8 +49,15 @@ public class AmqpJmsMessageFacadeTest {
return new AmqpJmsMessageFacade(createMockAmqpConnection());
}
- private AmqpJmsMessageFacade createReceivedMessageFacade(AmqpConnection amqpConnection, Message message) {
- return new AmqpJmsMessageFacade(amqpConnection, message);
+ private AmqpJmsMessageFacade createReceivedMessageFacade(AmqpConsumer amqpConsumer, Message message) {
+ return new AmqpJmsMessageFacade(amqpConsumer, message);
+ }
+
+
+ private AmqpConsumer createMockAmqpConsumer() {
+ AmqpConsumer consumer = Mockito.mock(AmqpConsumer.class);
+ Mockito.when(consumer.getConnection()).thenReturn(createMockAmqpConnection());
+ return consumer;
}
private AmqpConnection createMockAmqpConnection() {
@@ -225,7 +235,7 @@ public class AmqpJmsMessageFacadeTest {
expected = AmqpMessageIdHelper.JMS_ID_PREFIX + expected;
}
- AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConnection(), message);
+ AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConsumer(), message);
assertNotNull("Expected a correlationId on received message", amqpMessageFacade.getCorrelationId());
@@ -305,7 +315,7 @@ public class AmqpJmsMessageFacadeTest {
props.setMessageId(underlyingIdObject);
message.setProperties(props);
- AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConnection(), message);
+ AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConsumer(), message);
assertNotNull("Expected a messageId on received message", amqpMessageFacade.getMessageId());
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org
[4/4] git commit: Add mock return for consumer.getDestination
Posted by ta...@apache.org.
Add mock return for consumer.getDestination
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/bdc47a55
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/bdc47a55
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/bdc47a55
Branch: refs/heads/master
Commit: bdc47a55c467e7d9acfc8dd0f6db57563ba295ee
Parents: 459c90a
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 24 16:14:31 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 24 16:14:31 2014 -0400
----------------------------------------------------------------------
.../jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java | 5 +++++
1 file changed, 5 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bdc47a55/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
index af68cf3..c1200fb 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
@@ -30,6 +30,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsTopic;
import org.apache.qpid.jms.meta.JmsMessageId;
import org.apache.qpid.jms.provider.amqp.AmqpConnection;
import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
@@ -45,6 +47,8 @@ import org.mockito.Mockito;
public class AmqpJmsMessageFacadeTest {
+ private final JmsDestination consumerDestination = new JmsTopic("TestTopic");
+
private AmqpJmsMessageFacade createNewMessageFacade() {
return new AmqpJmsMessageFacade(createMockAmqpConnection());
}
@@ -57,6 +61,7 @@ public class AmqpJmsMessageFacadeTest {
private AmqpConsumer createMockAmqpConsumer() {
AmqpConsumer consumer = Mockito.mock(AmqpConsumer.class);
Mockito.when(consumer.getConnection()).thenReturn(createMockAmqpConnection());
+ Mockito.when(consumer.getDestination()).thenReturn(consumerDestination);
return consumer;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org