You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/24 22:17:56 UTC

[1/4] git commit: Add destination accessor

Repository: qpid-jms
Updated Branches:
  refs/heads/master 5c483dbcb -> bdc47a55c


Add destination accessor

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/8efad7c6
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/8efad7c6
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/8efad7c6

Branch: refs/heads/master
Commit: 8efad7c651892fef1cd14a910ec2457980722eac
Parents: 5c483db
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 24 14:19:15 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 24 14:19:43 2014 -0400

----------------------------------------------------------------------
 .../java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8efad7c6/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 3ecb58e..28b989c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -30,8 +30,8 @@ import org.apache.qpid.jms.meta.JmsConsumerId;
 import org.apache.qpid.jms.meta.JmsConsumerInfo;
 import org.apache.qpid.jms.meta.JmsMessageId;
 import org.apache.qpid.jms.provider.AsyncResult;
-import org.apache.qpid.jms.provider.ProviderListener;
 import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.provider.ProviderListener;
 import org.apache.qpid.jms.util.IOExceptionSupport;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
@@ -363,6 +363,10 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
         return this.info.getConsumerId();
     }
 
+    public JmsDestination getDestination() {
+        return this.info.getDestination();
+    }
+
     public Receiver getProtonReceiver() {
         return this.endpoint;
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[3/4] git commit: Add some notes

Posted by ta...@apache.org.
Add some notes

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/459c90a8
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/459c90a8
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/459c90a8

Branch: refs/heads/master
Commit: 459c90a89aae32a4d9ffade62157fbd7e03e6568
Parents: eae4583
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 24 16:12:40 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 24 16:12:40 2014 -0400

----------------------------------------------------------------------
 .../jms/provider/amqp/message/AmqpDestinationHelper.java  | 10 ++++++++++
 1 file changed, 10 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/459c90a8/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
index 9d7445a..350aeb4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
@@ -48,6 +48,16 @@ public class AmqpDestinationHelper {
     // TODO - The Type Annotation seems like it could just be a byte value
     // TODO - How do we deal with the case where no type is present?
 
+    /*
+     *  One possible way to encode destination types that isn't a string.
+     *
+     *  public static final byte QUEUE_TYPE = 0x01;
+     *  public static final byte TOPIC_TYPE = 0x02;
+     *  public static final byte TEMP_MASK = 0x04;
+     *  public static final byte TEMP_TOPIC_TYPE = TOPIC_TYPE | TEMP_MASK;
+     *  public static final byte TEMP_QUEUE_TYPE = QUEUE_TYPE | TEMP_MASK;
+     */
+
     /**
      * Decode the provided To address, type description, and consumer destination
      * information such that an appropriate Destination object can be returned.


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[2/4] git commit: Implement get / set of JMS Destination from the AMQP message facade

Posted by ta...@apache.org.
Implement get / set of JMS Destination from the AMQP message facade 

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/eae45830
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/eae45830
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/eae45830

Branch: refs/heads/master
Commit: eae45830c04e88823f6d3e38f1cc8ece891ae664
Parents: 8efad7c
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 24 16:06:58 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 24 16:06:58 2014 -0400

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpConsumer.java    |   5 +
 .../amqp/message/AmqpDestinationHelper.java     | 200 +++++++++++++++++++
 .../amqp/message/AmqpJmsBytesMessageFacade.java |  10 +-
 .../amqp/message/AmqpJmsMapMessageFacade.java   |  11 +-
 .../amqp/message/AmqpJmsMessageBuilder.java     |  37 ++--
 .../amqp/message/AmqpJmsMessageFacade.java      |  38 ++--
 .../message/AmqpJmsObjectMessageFacade.java     |  15 +-
 .../message/AmqpJmsStreamMessageFacade.java     |  11 +-
 .../amqp/message/AmqpJmsTextMessageFacade.java  |  11 +-
 .../amqp/message/AmqpMessageSupport.java        |  24 +++
 .../amqp/message/AmqpJmsMessageFacadeTest.java  |  22 +-
 11 files changed, 315 insertions(+), 69 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
index 28b989c..1f51b46 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java
@@ -355,6 +355,11 @@ public class AmqpConsumer extends AbstractAmqpResource<JmsConsumerInfo, Receiver
     protected void doClose() {
     }
 
+
+    public AmqpConnection getConnection() {
+        return this.session.getConnection();
+    }
+
     public AmqpSession getSession() {
         return this.session;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
new file mode 100644
index 0000000..9d7445a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpDestinationHelper.java
@@ -0,0 +1,200 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.provider.amqp.message;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsQueue;
+import org.apache.qpid.jms.JmsTemporaryQueue;
+import org.apache.qpid.jms.JmsTemporaryTopic;
+import org.apache.qpid.jms.JmsTopic;
+
+/**
+ * A set of static utility method useful when mapping JmsDestination types to / from the AMQP
+ * destination fields in a Message that's being sent or received.
+ */
+public class AmqpDestinationHelper {
+
+    public static final AmqpDestinationHelper INSTANCE = new AmqpDestinationHelper();
+
+    public static final String TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-to-type";
+    public static final String REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME = "x-opt-reply-type";
+
+    static final String QUEUE_ATTRIBUTE = "queue";
+    static final String TOPIC_ATTRIBUTE = "topic";
+    static final String TEMPORARY_ATTRIBUTE = "temporary";
+
+    public static final String QUEUE_ATTRIBUTES_STRING = QUEUE_ATTRIBUTE;
+    public static final String TOPIC_ATTRIBUTES_STRING = TOPIC_ATTRIBUTE;
+    public static final String TEMP_QUEUE_ATTRIBUTES_STRING = QUEUE_ATTRIBUTE + "," + TEMPORARY_ATTRIBUTE;
+    public static final String TEMP_TOPIC_ATTRIBUTES_STRING = TOPIC_ATTRIBUTE + "," + TEMPORARY_ATTRIBUTE;
+
+    // TODO - The Type Annotation seems like it could just be a byte value
+    // TODO - How do we deal with the case where no type is present?
+
+    /**
+     * Decode the provided To address, type description, and consumer destination
+     * information such that an appropriate Destination object can be returned.
+     *
+     * If an address and type description is provided then this will be used to
+     * create the Destination. If the type information is missing, it will be
+     * derived from the consumer destination if present, or default to a generic
+     * destination if not.
+     *
+     * If the address is null then the consumer destination is returned, unless
+     * the useConsumerDestForTypeOnly flag is true, in which case null will be
+     * returned.
+     */
+
+    public JmsDestination buildJmsDestination(AmqpJmsMessageFacade message, JmsDestination consumerDestination) {
+        String to = message.getAmqpMessage().getAddress();
+        String toTypeString = (String) message.getAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+        Set<String> typeSet = null;
+
+        if (toTypeString != null) {
+            typeSet = splitAttributes(toTypeString);
+        }
+
+        return createDestination(to, typeSet, consumerDestination, false);
+    }
+
+    public JmsDestination buildJmsReplyTo(AmqpJmsMessageFacade message, JmsDestination consumerDestination) {
+        String replyTo = message.getAmqpMessage().getReplyTo();
+        String replyToTypeString = (String) message.getAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+        Set<String> typeSet = null;
+
+        if (replyToTypeString != null) {
+            typeSet = splitAttributes(replyToTypeString);
+        }
+
+        return createDestination(replyTo, typeSet, consumerDestination, true);
+    }
+
+    private JmsDestination createDestination(String address, Set<String> typeSet, JmsDestination consumerDestination, boolean useConsumerDestForTypeOnly) {
+        if (address == null) {
+            return useConsumerDestForTypeOnly ? null : consumerDestination;
+        }
+
+        if (typeSet != null && !typeSet.isEmpty()) {
+            if (typeSet.contains(QUEUE_ATTRIBUTE)) {
+                if (typeSet.contains(TEMPORARY_ATTRIBUTE)) {
+                    return new JmsTemporaryQueue(address);
+                } else {
+                    return new JmsQueue(address);
+                }
+            } else if (typeSet.contains(TOPIC_ATTRIBUTE)) {
+                if (typeSet.contains(TEMPORARY_ATTRIBUTE)) {
+                    return new JmsTemporaryTopic(address);
+                } else {
+                    return new JmsTopic(address);
+                }
+            }
+        }
+
+        if (consumerDestination.isQueue()) {
+            if (consumerDestination.isTemporary()) {
+                return new JmsTemporaryQueue(address);
+            } else {
+                return new JmsQueue(address);
+            }
+        } else if (consumerDestination.isTopic()) {
+            if (consumerDestination.isTemporary()) {
+                return new JmsTemporaryTopic(address);
+            } else {
+                return new JmsTopic(address);
+            }
+        }
+
+        // fall back to a straight Destination
+        // TODO - We don't have a non-abstract destination to create right now
+        //        and JMS doesn't really define a true non Topic / Queue destination
+        //        so how this would be handled elsewhere seems a mystery.
+        return null;
+    }
+
+    public void setToAddressFromDestination(AmqpJmsMessageFacade message, JmsDestination destination) {
+        String address = destination.getName();
+        String typeString = toTypeAnnotation(destination);
+
+        message.getAmqpMessage().setAddress(address);
+
+        if (address == null || typeString == null) {
+            message.removeAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+        } else {
+            message.setAnnotation(TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, typeString);
+        }
+    }
+
+    public void setReplyToAddressFromDestination(AmqpJmsMessageFacade message, JmsDestination destination) {
+        String replyToAddress = destination.getName();
+        String typeString = toTypeAnnotation(destination);
+
+        message.getAmqpMessage().setReplyTo(replyToAddress);
+
+        if (replyToAddress == null || typeString == null) {
+            message.removeAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME);
+        } else {
+            message.setAnnotation(REPLY_TO_TYPE_MSG_ANNOTATION_SYMBOL_NAME, typeString);
+        }
+    }
+
+    /**
+     * @return the annotation type string, or null if the supplied destination
+     *         is null or can't be classified
+     */
+    private String toTypeAnnotation(JmsDestination destination) {
+        if (destination == null) {
+            return null;
+        }
+
+        if (destination.isQueue()) {
+            if (destination.isTemporary()) {
+                return TEMP_QUEUE_ATTRIBUTES_STRING;
+            } else {
+                return QUEUE_ATTRIBUTES_STRING;
+            }
+        } else if (destination.isTopic()) {
+            if (destination.isTemporary()) {
+                return TEMP_TOPIC_ATTRIBUTES_STRING;
+            } else {
+                return TOPIC_ATTRIBUTES_STRING;
+            }
+        }
+
+        return null;
+    }
+
+    public Set<String> splitAttributes(String typeString) {
+        if (typeString == null) {
+            return null;
+        }
+
+        HashSet<String> typeSet = new HashSet<String>();
+
+        // Split string on commas and their surrounding whitespace
+        for (String attr : typeString.split("\\s*,\\s*")) {
+            // ignore empty values
+            if (!attr.equals("")) {
+                typeSet.add(attr);
+            }
+        }
+
+        return typeSet;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
index 1cefc2a..664d414 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
@@ -21,6 +21,7 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_M
 
 import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Data;
@@ -42,6 +43,7 @@ public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements J
      * Creates a new facade instance
      *
      * @param connection
+     *        the AmqpConnection that under which this facade was created.
      */
     public AmqpJmsBytesMessageFacade(AmqpConnection connection) {
         super(connection);
@@ -52,13 +54,13 @@ public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements J
      * Creates a new Facade around an incoming AMQP Message for dispatch to the
      * JMS Consumer instance.
      *
-     * @param connection
-     *        the connection that created this Facade.
+     * @param consumer
+     *        the consumer that received this message.
      * @param message
      *        the incoming Message instance that is being wrapped.
      */
-    public AmqpJmsBytesMessageFacade(AmqpConnection connection, Message message) {
-        super(connection, message);
+    public AmqpJmsBytesMessageFacade(AmqpConsumer consumer, Message message) {
+        super(consumer, message);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
index 511d1e3..fdda9e4 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
@@ -27,6 +27,7 @@ import java.util.Map;
 
 import org.apache.qpid.jms.message.facade.JmsMapMessageFacade;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Section;
@@ -44,7 +45,7 @@ public class AmqpJmsMapMessageFacade extends AmqpJmsMessageFacade implements Jms
      * Create a new facade ready for sending.
      *
      * @param connection
-     *        the connection instance that created this facade.
+     *        the AmqpConnection that under which this facade was created.
      */
     public AmqpJmsMapMessageFacade(AmqpConnection connection) {
         super(connection);
@@ -56,14 +57,14 @@ public class AmqpJmsMapMessageFacade extends AmqpJmsMessageFacade implements Jms
      * Creates a new Facade around an incoming AMQP Message for dispatch to the
      * JMS Consumer instance.
      *
-     * @param connection
-     *        the connection that created this Facade.
+     * @param consumer
+     *        the consumer that received this message.
      * @param message
      *        the incoming Message instance that is being wrapped.
      */
     @SuppressWarnings("unchecked")
-    public AmqpJmsMapMessageFacade(AmqpConnection connection, Message message) {
-        super(connection, message);
+    public AmqpJmsMapMessageFacade(AmqpConsumer consumer, Message message) {
+        super(consumer, message);
 
         Section body = getAmqpMessage().getBody();
         if (body == null) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
index c31c50b..a09cee2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageBuilder.java
@@ -25,7 +25,6 @@ import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_S
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_TEXT_MESSAGE;
 
 import java.io.IOException;
-import java.util.Map;
 
 import org.apache.qpid.jms.message.JmsBytesMessage;
 import org.apache.qpid.jms.message.JmsMapMessage;
@@ -33,8 +32,7 @@ import org.apache.qpid.jms.message.JmsMessage;
 import org.apache.qpid.jms.message.JmsObjectMessage;
 import org.apache.qpid.jms.message.JmsStreamMessage;
 import org.apache.qpid.jms.message.JmsTextMessage;
-import org.apache.qpid.jms.provider.amqp.AmqpConnection;
-import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
 import org.apache.qpid.proton.message.Message;
 
 /**
@@ -50,8 +48,8 @@ public class AmqpJmsMessageBuilder {
      * Create a new JmsMessage and underlying JmsMessageFacade that represents the proper
      * message type for the incoming AMQP message.
      *
-     * @param connection
-     *        The provider AMQP Connection instance where this message arrived at.
+     * @param consumer
+     *        The provider AMQP Consumer instance where this message arrived at.
      * @param message
      *        The Proton Message object that will be wrapped.
      *
@@ -59,10 +57,10 @@ public class AmqpJmsMessageBuilder {
      *
      * @throws IOException if an error occurs while creating the message objects.
      */
-    public static JmsMessage createJmsMessage(AmqpConnection connection, Message message) throws IOException {
+    public static JmsMessage createJmsMessage(AmqpConsumer consumer, Message message) throws IOException {
 
         // First we try the easy way, if the annotation is there we don't have to work hard.
-        JmsMessage result = createFromMsgAnnotation(connection, message);
+        JmsMessage result = createFromMsgAnnotation(consumer, message);
         if (result != null) {
             return result;
         }
@@ -71,23 +69,23 @@ public class AmqpJmsMessageBuilder {
         throw new IOException("Could not create a JMS message from incoming message");
     }
 
-    private static JmsMessage createFromMsgAnnotation(AmqpConnection connection, Message message) throws IOException {
-        Object annotation = getMessageAnnotation(JMS_MSG_TYPE, message);
+    private static JmsMessage createFromMsgAnnotation(AmqpConsumer consumer, Message message) throws IOException {
+        Object annotation = AmqpMessageSupport.getMessageAnnotation(JMS_MSG_TYPE, message);
         if (annotation != null) {
 
             switch ((byte) annotation) {
                 case JMS_MESSAGE:
-                    return new JmsMessage(new AmqpJmsMessageFacade(connection, message));
+                    return new JmsMessage(new AmqpJmsMessageFacade(consumer, message));
                 case JMS_BYTES_MESSAGE:
-                    return new JmsBytesMessage(new AmqpJmsBytesMessageFacade(connection, message));
+                    return new JmsBytesMessage(new AmqpJmsBytesMessageFacade(consumer, message));
                 case JMS_TEXT_MESSAGE:
-                    return new JmsTextMessage(new AmqpJmsTextMessageFacade(connection, message));
+                    return new JmsTextMessage(new AmqpJmsTextMessageFacade(consumer, message));
                 case JMS_MAP_MESSAGE:
-                    return new JmsMapMessage(new AmqpJmsMapMessageFacade(connection, message));
+                    return new JmsMapMessage(new AmqpJmsMapMessageFacade(consumer, message));
                 case JMS_STREAM_MESSAGE:
-                    return new JmsStreamMessage(new AmqpJmsStreamMessageFacade(connection, message));
+                    return new JmsStreamMessage(new AmqpJmsStreamMessageFacade(consumer, message));
                 case JMS_OBJECT_MESSAGE:
-                    return new JmsObjectMessage(new AmqpJmsObjectMessageFacade(connection, message));
+                    return new JmsObjectMessage(new AmqpJmsObjectMessageFacade(consumer, message));
                 default:
                     throw new IOException("Invalid JMS Message Type annotation found in message");
             }
@@ -95,13 +93,4 @@ public class AmqpJmsMessageBuilder {
 
         return null;
     }
-
-    private static Object getMessageAnnotation(String key, Message message) {
-        if (message.getMessageAnnotations() != null) {
-            Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
-            return annotations.get(AmqpMessageSupport.getSymbol(key));
-        }
-
-        return null;
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
index 4937000..acd3012 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
@@ -37,6 +37,7 @@ import org.apache.qpid.jms.exceptions.IdConversionException;
 import org.apache.qpid.jms.message.facade.JmsMessageFacade;
 import org.apache.qpid.jms.meta.JmsMessageId;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;
@@ -74,6 +75,9 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
 
     /**
      * Create a new AMQP Message Facade with an empty message instance.
+     *
+     * @param connection
+     *        the AmqpConnection that under which this facade was created.
      */
     public AmqpJmsMessageFacade(AmqpConnection connection) {
         this.message = Proton.message();
@@ -87,15 +91,15 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
      * Creates a new Facade around an incoming AMQP Message for dispatch to the
      * JMS Consumer instance.
      *
-     * @param connection
-     *        the connection that created this Facade.
+     * @param consumer
+     *        the consumer that received this message.
      * @param message
      *        the incoming Message instance that is being wrapped.
      */
     @SuppressWarnings("unchecked")
-    public AmqpJmsMessageFacade(AmqpConnection connection, Message message) {
+    public AmqpJmsMessageFacade(AmqpConsumer consumer, Message message) {
         this.message = message;
-        this.connection = connection;
+        this.connection = consumer.getConnection();
 
         annotations = message.getMessageAnnotations();
         if (annotations != null) {
@@ -112,8 +116,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
             syntheticTTL = System.currentTimeMillis() + ttl;
         }
 
-        // TODO - Set destination
-        // TODO - Set replyTo
+        this.destination = AmqpDestinationHelper.INSTANCE.buildJmsDestination(this, consumer.getDestination());
+        this.replyTo = AmqpDestinationHelper.INSTANCE.buildJmsReplyTo(this, consumer.getDestination());
     }
 
     /**
@@ -223,7 +227,6 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
 
     @Override
     public void clearProperties() {
-        clearProperties();
         //_propJMS_AMQP_TTL = null;
         message.setReplyToGroupId(null);
         message.setUserId(null);
@@ -235,7 +238,7 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
 
     @Override
     public JmsMessageFacade copy() throws JMSException {
-        AmqpJmsMessageFacade copy = new AmqpJmsMessageFacade(connection, message);
+        AmqpJmsMessageFacade copy = new AmqpJmsMessageFacade(connection);
         copyInto(copy);
         return copy;
     }
@@ -250,12 +253,12 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
         AmqpMessageIdHelper helper = AmqpMessageIdHelper.INSTANCE;
         String baseStringId = helper.toBaseMessageIdString(underlying);
 
-        //Ensure the ID: prefix is present.
-        //TODO: should we always do this? AMQP JMS Mapping says never to send the "ID:" prefix.
-        //TODO: should we make this part of the JmsMessageId, or JmsMessage object responsibilities?
-        //      I Ended up putting it in JmsMessage after the above comment, as a workaround for the current JmsDefaultMessageFacade usage.
-        if(baseStringId != null && !helper.hasMessageIdPrefix(baseStringId))
-        {
+        // Ensure the ID: prefix is present.
+        // TODO: should we always do this? AMQP JMS Mapping says never to send the "ID:" prefix.
+        // TODO: should we make this part of the JmsMessageId, or JmsMessage object responsibilities?
+        //       I Ended up putting it in JmsMessage after the above comment, as a workaround for the
+        //       current JmsDefaultMessageFacade usage.
+        if (baseStringId != null && !helper.hasMessageIdPrefix(baseStringId)) {
             baseStringId = AmqpMessageIdHelper.JMS_ID_PREFIX + baseStringId;
         }
         return new JmsMessageId(baseStringId);
@@ -514,8 +517,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
     @Override
     public void setDestination(JmsDestination destination) {
         this.destination = destination;
-
-        // TODO
+        lazyCreateAnnotations();
+        AmqpDestinationHelper.INSTANCE.setToAddressFromDestination(this, destination);
     }
 
     @Override
@@ -526,7 +529,8 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
     @Override
     public void setReplyTo(JmsDestination replyTo) {
         this.replyTo = replyTo;
-        // TODO Auto-generated method stub
+        lazyCreateAnnotations();
+        AmqpDestinationHelper.INSTANCE.setReplyToAddressFromDestination(this, replyTo);
     }
 
     public void setReplyToGroupId(String replyToGroupId) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
index 3696653..1668a6c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
@@ -27,6 +27,7 @@ import javax.jms.JMSException;
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.facade.JmsObjectMessageFacade;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
 import org.apache.qpid.proton.message.Message;
 
 /**
@@ -38,7 +39,10 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
     private AmqpObjectTypeDelegate delegate;
 
     /**
+     * Creates a new facade instance
+     *
      * @param connection
+     *        the AmqpConnection that under which this facade was created.
      */
     public AmqpJmsObjectMessageFacade(AmqpConnection connection) {
         super(connection);
@@ -49,11 +53,16 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
     }
 
     /**
-     * @param connection
+     * Creates a new Facade around an incoming AMQP Message for dispatch to the
+     * JMS Consumer instance.
+     *
+     * @param consumer
+     *        the consumer that received this message.
      * @param message
+     *        the incoming Message instance that is being wrapped.
      */
-    public AmqpJmsObjectMessageFacade(AmqpConnection connection, Message message) {
-        super(connection, message);
+    public AmqpJmsObjectMessageFacade(AmqpConsumer consumer, Message message) {
+        super(consumer, message);
 
         // TODO detect the content type and init the proper delegate.
         initDelegate(false);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
index 0999225..90cd97e 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
@@ -27,6 +27,7 @@ import javax.jms.MessageEOFException;
 
 import org.apache.qpid.jms.message.facade.JmsStreamMessageFacade;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Section;
@@ -45,7 +46,7 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements
      * Create a new facade ready for sending.
      *
      * @param connection
-     *        the connection instance that created this facade.
+     *        the AmqpConnection that under which this facade was created.
      */
     public AmqpJmsStreamMessageFacade(AmqpConnection connection) {
         super(connection);
@@ -57,14 +58,14 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements
      * Creates a new Facade around an incoming AMQP Message for dispatch to the
      * JMS Consumer instance.
      *
-     * @param connection
-     *        the connection that created this Facade.
+     * @param consumer
+     *        the consumer that received this message.
      * @param message
      *        the incoming Message instance that is being wrapped.
      */
     @SuppressWarnings("unchecked")
-    public AmqpJmsStreamMessageFacade(AmqpConnection connection, Message message) {
-        super(connection, message);
+    public AmqpJmsStreamMessageFacade(AmqpConsumer consumer, Message message) {
+        super(consumer, message);
 
         Section body = getAmqpMessage().getBody();
         if (body == null) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
index 6c2421b..c2d6fc2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
@@ -30,6 +30,7 @@ import javax.jms.JMSException;
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.facade.JmsTextMessageFacade;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.messaging.AmqpValue;
 import org.apache.qpid.proton.amqp.messaging.Data;
@@ -56,7 +57,7 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
      * Create a new AMQP Message facade ready for sending.
      *
      * @param connection
-     *        The AMQP Connection that created this message.
+     *        the AmqpConnection that under which this facade was created.
      */
     public AmqpJmsTextMessageFacade(AmqpConnection connection) {
         super(connection);
@@ -68,13 +69,13 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
      * Creates a new Facade around an incoming AMQP Message for dispatch to the
      * JMS Consumer instance.
      *
-     * @param connection
-     *        the connection that created this Facade.
+     * @param consumer
+     *        the consumer that received this message.
      * @param message
      *        the incoming Message instance that is being wrapped.
      */
-    public AmqpJmsTextMessageFacade(AmqpConnection connection, Message message) {
-        super(connection, message);
+    public AmqpJmsTextMessageFacade(AmqpConsumer consumer, Message message) {
+        super(consumer, message);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
index a01d415..563352b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
@@ -16,6 +16,8 @@
  */
 package org.apache.qpid.jms.provider.amqp.message;
 
+import java.util.Map;
+
 import javax.jms.Destination;
 import javax.jms.Queue;
 import javax.jms.TemporaryQueue;
@@ -23,6 +25,7 @@ import javax.jms.TemporaryTopic;
 import javax.jms.Topic;
 
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.message.Message;
 
 /**
  * Support class containing constant values and static methods that are
@@ -166,4 +169,25 @@ public final class AmqpMessageSupport {
 
         return null;
     }
+
+    /**
+     * Safe way to access message annotations which will check internal structure and
+     * either return the annotation if it exists or null if the annotation or any annotations
+     * are present.
+     *
+     * @param key
+     *        the String key to use to lookup an annotation.
+     * @param message
+     *        the AMQP message object that is being examined.
+     *
+     * @return the given annotation value or null if not present in the message.
+     */
+    public static Object getMessageAnnotation(String key, Message message) {
+        if (message != null && message.getMessageAnnotations() != null) {
+            Map<Symbol, Object> annotations = message.getMessageAnnotations().getValue();
+            return annotations.get(AmqpMessageSupport.getSymbol(key));
+        }
+
+        return null;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/eae45830/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
index a276c7f..af68cf3 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
@@ -20,7 +20,10 @@
  */
 package org.apache.qpid.jms.provider.amqp.message;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import java.nio.ByteBuffer;
 import java.util.HashMap;
@@ -29,6 +32,7 @@ import java.util.UUID;
 
 import org.apache.qpid.jms.meta.JmsMessageId;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
+import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.Symbol;
@@ -36,7 +40,6 @@ import org.apache.qpid.proton.amqp.UnsignedLong;
 import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.amqp.messaging.Properties;
 import org.apache.qpid.proton.message.Message;
-import org.junit.Ignore;
 import org.junit.Test;
 import org.mockito.Mockito;
 
@@ -46,8 +49,15 @@ public class AmqpJmsMessageFacadeTest {
         return new AmqpJmsMessageFacade(createMockAmqpConnection());
     }
 
-    private AmqpJmsMessageFacade createReceivedMessageFacade(AmqpConnection amqpConnection, Message message) {
-        return new AmqpJmsMessageFacade(amqpConnection, message);
+    private AmqpJmsMessageFacade createReceivedMessageFacade(AmqpConsumer amqpConsumer, Message message) {
+        return new AmqpJmsMessageFacade(amqpConsumer, message);
+    }
+
+
+    private AmqpConsumer createMockAmqpConsumer() {
+        AmqpConsumer consumer = Mockito.mock(AmqpConsumer.class);
+        Mockito.when(consumer.getConnection()).thenReturn(createMockAmqpConnection());
+        return consumer;
     }
 
     private AmqpConnection createMockAmqpConnection() {
@@ -225,7 +235,7 @@ public class AmqpJmsMessageFacadeTest {
             expected = AmqpMessageIdHelper.JMS_ID_PREFIX + expected;
         }
 
-        AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConnection(), message);
+        AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConsumer(), message);
 
         assertNotNull("Expected a correlationId on received message", amqpMessageFacade.getCorrelationId());
 
@@ -305,7 +315,7 @@ public class AmqpJmsMessageFacadeTest {
         props.setMessageId(underlyingIdObject);
         message.setProperties(props);
 
-        AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConnection(), message);
+        AmqpJmsMessageFacade amqpMessageFacade = createReceivedMessageFacade(createMockAmqpConsumer(), message);
 
         assertNotNull("Expected a messageId on received message", amqpMessageFacade.getMessageId());
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


[4/4] git commit: Add mock return for consumer.getDestination

Posted by ta...@apache.org.
Add mock return for consumer.getDestination

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/bdc47a55
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/bdc47a55
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/bdc47a55

Branch: refs/heads/master
Commit: bdc47a55c467e7d9acfc8dd0f6db57563ba295ee
Parents: 459c90a
Author: Timothy Bish <ta...@gmail.com>
Authored: Wed Sep 24 16:14:31 2014 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Wed Sep 24 16:14:31 2014 -0400

----------------------------------------------------------------------
 .../jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java     | 5 +++++
 1 file changed, 5 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bdc47a55/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
index af68cf3..c1200fb 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
@@ -30,6 +30,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.UUID;
 
+import org.apache.qpid.jms.JmsDestination;
+import org.apache.qpid.jms.JmsTopic;
 import org.apache.qpid.jms.meta.JmsMessageId;
 import org.apache.qpid.jms.provider.amqp.AmqpConnection;
 import org.apache.qpid.jms.provider.amqp.AmqpConsumer;
@@ -45,6 +47,8 @@ import org.mockito.Mockito;
 
 public class AmqpJmsMessageFacadeTest {
 
+    private final JmsDestination consumerDestination = new JmsTopic("TestTopic");
+
     private AmqpJmsMessageFacade createNewMessageFacade() {
         return new AmqpJmsMessageFacade(createMockAmqpConnection());
     }
@@ -57,6 +61,7 @@ public class AmqpJmsMessageFacadeTest {
     private AmqpConsumer createMockAmqpConsumer() {
         AmqpConsumer consumer = Mockito.mock(AmqpConsumer.class);
         Mockito.when(consumer.getConnection()).thenReturn(createMockAmqpConnection());
+        Mockito.when(consumer.getDestination()).thenReturn(consumerDestination);
         return consumer;
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org