You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/09/12 20:12:06 UTC

[5/7] qpid-jms git commit: QPIDJMS-207 Adds dependency on JMS 2.0 API and initial implementation.

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
index 418d85c..bb2eb0b 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
@@ -16,6 +16,8 @@
  */
 package org.apache.qpid.jms.message;
 
+import static org.apache.qpid.jms.message.JmsMessagePropertySupport.checkPropertyNameIsValid;
+import static org.apache.qpid.jms.message.JmsMessagePropertySupport.checkValidObject;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_DELIVERY_COUNT;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPID;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPSEQ;
@@ -42,7 +44,6 @@ import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
-import javax.jms.MessageFormatException;
 
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.util.TypeConversionSupport;
@@ -808,74 +809,4 @@ public class JmsMessagePropertyIntercepter {
 
         return names;
     }
-
-    //----- Property Validation Methods --------------------------------------//
-
-    private static void checkPropertyNameIsValid(String propertyName, boolean validateNames) throws IllegalArgumentException {
-        if (propertyName == null) {
-            throw new IllegalArgumentException("Property name must not be null");
-        } else if (propertyName.length() == 0) {
-            throw new IllegalArgumentException("Property name must not be the empty string");
-        }
-
-        if (validateNames) {
-            checkIdentifierLetterAndDigitRequirements(propertyName);
-            checkIdentifierIsntNullTrueFalse(propertyName);
-            checkIdentifierIsntLogicOperator(propertyName);
-        }
-    }
-
-    private static void checkIdentifierIsntLogicOperator(String identifier) {
-        // Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or ESCAPE.
-        if ("NOT".equals(identifier) || "AND".equals(identifier) || "OR".equals(identifier) ||
-            "BETWEEN".equals(identifier) || "LIKE".equals(identifier) || "IN".equals(identifier) ||
-            "IS".equals(identifier) || "ESCAPE".equals(identifier)) {
-
-            throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
-        }
-    }
-
-    private static void checkIdentifierIsntNullTrueFalse(String identifier) {
-        // Identifiers cannot be the names NULL, TRUE, and FALSE.
-        if ("NULL".equals(identifier) || "TRUE".equals(identifier) || "FALSE".equals(identifier)) {
-            throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
-        }
-    }
-
-    private static void checkIdentifierLetterAndDigitRequirements(String identifier) {
-        // An identifier is an unlimited-length sequence of letters and digits, the first of
-        // which must be a letter.  A letter is any character for which the method
-        // Character.isJavaLetter returns true.  This includes '_' and '$'.  A letter or digit
-        // is any character for which the method Character.isJavaLetterOrDigit returns true.
-        char startChar = identifier.charAt(0);
-        if (!(Character.isJavaIdentifierStart(startChar))) {
-            throw new IllegalArgumentException("Identifier does not begin with a valid JMS identifier start character: '" + identifier + "' ");
-        }
-
-        // JMS part character
-        int length = identifier.length();
-        for (int i = 1; i < length; i++) {
-            char ch = identifier.charAt(i);
-            if (!(Character.isJavaIdentifierPart(ch))) {
-                throw new IllegalArgumentException("Identifier contains invalid JMS identifier character '" + ch + "': '" + identifier + "' ");
-            }
-        }
-    }
-
-    private static void checkValidObject(Object value) throws MessageFormatException {
-        boolean valid = value instanceof Boolean ||
-                        value instanceof Byte ||
-                        value instanceof Short ||
-                        value instanceof Integer ||
-                        value instanceof Long ||
-                        value instanceof Float ||
-                        value instanceof Double ||
-                        value instanceof Character ||
-                        value instanceof String ||
-                        value == null;
-
-        if (!valid) {
-            throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertySupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertySupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertySupport.java
new file mode 100644
index 0000000..2c3576c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertySupport.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.message;
+
+import javax.jms.JMSException;
+import javax.jms.MessageFormatException;
+
+import org.apache.qpid.jms.util.TypeConversionSupport;
+
+/**
+ * Provides methods for use when working with JMS Message Properties and their values.
+ */
+public class JmsMessagePropertySupport {
+
+    //----- Conversions Validation for Message Properties --------------------//
+
+    @SuppressWarnings("unchecked")
+    public static <T> T convertPropertyTo(String name, Object value, Class<T> target) throws JMSException {
+        if (value == null) {
+            if (Boolean.class.equals(target)) {
+                return (T) Boolean.FALSE;
+            } else if (Float.class.equals(target) || Double.class.equals(target)) {
+                throw new NullPointerException("property " + name + " was null");
+            } else if (Number.class.isAssignableFrom(target)) {
+                throw new NumberFormatException("property " + name + " was null");
+            } else {
+                return null;
+            }
+        }
+
+        T rc = (T) TypeConversionSupport.convert(value, target);
+        if (rc == null) {
+            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a " + target.getName());
+        }
+
+        return rc;
+    }
+
+    //----- Property Name Validation Methods ---------------------------------//
+
+    public static void checkPropertyNameIsValid(String propertyName, boolean validateNames) throws IllegalArgumentException {
+        if (propertyName == null) {
+            throw new IllegalArgumentException("Property name must not be null");
+        } else if (propertyName.length() == 0) {
+            throw new IllegalArgumentException("Property name must not be the empty string");
+        }
+
+        if (validateNames) {
+            checkIdentifierLetterAndDigitRequirements(propertyName);
+            checkIdentifierIsntNullTrueFalse(propertyName);
+            checkIdentifierIsntLogicOperator(propertyName);
+        }
+    }
+
+    public static void checkIdentifierIsntLogicOperator(String identifier) {
+        // Identifiers cannot be NOT, AND, OR, BETWEEN, LIKE, IN, IS, or ESCAPE.
+        if ("NOT".equals(identifier) || "AND".equals(identifier) || "OR".equals(identifier) ||
+            "BETWEEN".equals(identifier) || "LIKE".equals(identifier) || "IN".equals(identifier) ||
+            "IS".equals(identifier) || "ESCAPE".equals(identifier)) {
+
+            throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
+        }
+    }
+
+    public static void checkIdentifierIsntNullTrueFalse(String identifier) {
+        // Identifiers cannot be the names NULL, TRUE, and FALSE.
+        if ("NULL".equals(identifier) || "TRUE".equals(identifier) || "FALSE".equals(identifier)) {
+            throw new IllegalArgumentException("Identifier not allowed in JMS: '" + identifier + "'");
+        }
+    }
+
+    public static void checkIdentifierLetterAndDigitRequirements(String identifier) {
+        // An identifier is an unlimited-length sequence of letters and digits, the first of
+        // which must be a letter.  A letter is any character for which the method
+        // Character.isJavaLetter returns true.  This includes '_' and '$'.  A letter or digit
+        // is any character for which the method Character.isJavaLetterOrDigit returns true.
+        char startChar = identifier.charAt(0);
+        if (!(Character.isJavaIdentifierStart(startChar))) {
+            throw new IllegalArgumentException("Identifier does not begin with a valid JMS identifier start character: '" + identifier + "' ");
+        }
+
+        // JMS part character
+        int length = identifier.length();
+        for (int i = 1; i < length; i++) {
+            char ch = identifier.charAt(i);
+            if (!(Character.isJavaIdentifierPart(ch))) {
+                throw new IllegalArgumentException("Identifier contains invalid JMS identifier character '" + ch + "': '" + identifier + "' ");
+            }
+        }
+    }
+
+    //----- Property Type Validation Methods ---------------------------------//
+
+    public static void checkValidObject(Object value) throws MessageFormatException {
+        boolean valid = value instanceof Boolean ||
+                        value instanceof Byte ||
+                        value instanceof Short ||
+                        value instanceof Integer ||
+                        value instanceof Long ||
+                        value instanceof Float ||
+                        value instanceof Double ||
+                        value instanceof Character ||
+                        value instanceof String ||
+                        value == null;
+
+        if (!valid) {
+            throw new MessageFormatException("Only objectified primitive objects and String types are allowed but was: " + value + " type: " + value.getClass());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java
index 902719b..e8e0d22 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsObjectMessage.java
@@ -60,6 +60,25 @@ public class JmsObjectMessage extends JmsMessage implements ObjectMessage {
     }
 
     @Override
+    public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class target) throws JMSException {
+        if (!facade.hasBody()) {
+            return true;
+        }
+
+        return Serializable.class == target || Object.class == target || target.isInstance(getObject());
+    }
+
+    @SuppressWarnings("unchecked")
+    @Override
+    protected <T> T doGetBody(Class<T> asType) throws JMSException {
+        try {
+            return (T) getObject();
+        } catch (JMSException e) {
+            throw new MessageFormatException("Failed to read Object: " + e.getMessage());
+        }
+    }
+
+    @Override
     public String toString() {
         return "JmsObjectMessageFacade { " + facade.toString() + " }";
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java
index b334249..c9765f3 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsStreamMessage.java
@@ -475,6 +475,11 @@ public class JmsStreamMessage extends JmsMessage implements StreamMessage {
         return "JmsStreamMessage { " + facade.toString() + " }";
     }
 
+    @Override
+    public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class target) throws JMSException {
+        return false;
+    }
+
     private void checkBytesInFlight() throws MessageFormatException {
         if (remainingBytes != NO_BYTES_IN_FLIGHT) {
             throw new MessageFormatException(

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java
index 69aefe8..9647b57 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsTextMessage.java
@@ -22,6 +22,7 @@ import javax.jms.TextMessage;
 
 import org.apache.qpid.jms.message.facade.JmsTextMessageFacade;
 
+@SuppressWarnings("unchecked")
 public class JmsTextMessage extends JmsMessage implements TextMessage {
 
     private final JmsTextMessageFacade facade;
@@ -57,4 +58,14 @@ public class JmsTextMessage extends JmsMessage implements TextMessage {
     public String toString() {
         return "JmsTextMessage { " + facade.toString() + " }";
     }
+
+    @Override
+    public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class target) throws JMSException {
+        return facade.hasBody() ? target.isAssignableFrom(String.class) : true;
+    }
+
+    @Override
+    protected <T> T doGetBody(Class<T> asType) throws JMSException {
+        return (T) getText();
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java
index 7321a8e..73117b9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsBytesMessageFacade.java
@@ -86,4 +86,10 @@ public interface JmsBytesMessageFacade extends JmsMessageFacade {
      * @return the number of bytes contained in the body of the message.
      */
     int getBodyLength();
+
+    /**
+     * @return a copy of the bytes contained in the body of the message.
+     */
+    byte[] copyBody();
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
index 1741251..d5616f5 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/facade/JmsMessageFacade.java
@@ -315,6 +315,25 @@ public interface JmsMessageFacade {
     void setExpiration(long expiration);
 
     /**
+     * Returns the set delivery time for this message.
+     *
+     * The value should be returned as an absolute time given in GMT time.
+     *
+     * @return the earliest time that the message should be made available for delivery.
+     */
+    long getDeliveryTime();
+
+    /**
+     * Sets an desired delivery time on this message.
+     *
+     * The delivery time will be given as an absolute time in GMT time.
+     *
+     * @param deliveryTime
+     *        the earliest time that the message should be made available for delivery.
+     */
+    void setDeliveryTime(long deliveryTime);
+
+    /**
      * Gets the Destination value that was assigned to this message at the time it was
      * sent.
      *
@@ -429,4 +448,11 @@ public interface JmsMessageFacade {
      */
     void setProviderMessageIdObject(Object messageId);
 
+    /**
+     * Returns true if the underlying message has a body, false if the body is empty.
+     *
+     * @return true if the underlying message has a body, false if the body is empty.
+     */
+    boolean hasBody();
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
index 369dfa9..6b63fb7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsBytesMessageFacade.java
@@ -19,10 +19,6 @@ package org.apache.qpid.jms.provider.amqp.message;
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_BYTES_MESSAGE;
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.ByteBufInputStream;
-import io.netty.buffer.ByteBufOutputStream;
-import io.netty.buffer.Unpooled;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -40,6 +36,11 @@ import org.apache.qpid.proton.amqp.messaging.Data;
 import org.apache.qpid.proton.amqp.messaging.Section;
 import org.apache.qpid.proton.message.Message;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufInputStream;
+import io.netty.buffer.ByteBufOutputStream;
+import io.netty.buffer.Unpooled;
+
 /**
  * A JmsBytesMessageFacade that wraps around Proton AMQP Message instances to provide
  * access to the underlying bytes contained in the message.
@@ -216,6 +217,21 @@ public class AmqpJmsBytesMessageFacade extends AmqpJmsMessageFacade implements J
     }
 
     @Override
+    public boolean hasBody() {
+        return getBinaryFromBody().getLength() != 0;
+    }
+
+    @Override
+    public byte[] copyBody() {
+        Binary content = getBinaryFromBody();
+        byte[] result = new byte[content.getLength()];
+
+        System.arraycopy(content.getArray(), content.getArrayOffset(), result, 0, content.getLength());
+
+        return result;
+    }
+
+    @Override
     public void onSend(long producerTtl) throws JMSException {
         super.onSend(producerTtl);
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
index dc2cf0b..fe242f7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMapMessageFacade.java
@@ -141,6 +141,11 @@ public class AmqpJmsMapMessageFacade extends AmqpJmsMessageFacade implements Jms
         messageBodyMap.clear();
     }
 
+    @Override
+    public boolean hasBody() {
+        return !messageBodyMap.isEmpty();
+    }
+
     private void initializeEmptyBody() {
         // Using LinkedHashMap because AMQP map equality considers order,
         // so we should behave in as predictable a manner as possible

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
index f98dccc..82f63e0 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
@@ -553,6 +553,18 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
         }
     }
 
+    @Override
+    public long getDeliveryTime() {
+        // TODO Auto-generated method stub
+        return 0;
+    }
+
+    @Override
+    public void setDeliveryTime(long deliveryTime) {
+        // TODO Auto-generated method stub
+
+    }
+
     /**
      * Sets a value which will be used to override any ttl value that may otherwise be set
      * based on the expiration value when sending the underlying AMQP message. A value of 0
@@ -702,6 +714,11 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
         }
     }
 
+    @Override
+    public boolean hasBody() {
+        return message.getBody() == null;
+    }
+
     /**
      * @return the true AMQP Message instance wrapped by this Facade.
      */

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
index fabefed..f4b541f 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsObjectMessageFacade.java
@@ -125,6 +125,11 @@ public class AmqpJmsObjectMessageFacade extends AmqpJmsMessageFacade implements
     }
 
     @Override
+    public boolean hasBody() {
+        return delegate.hasBody();
+    }
+
+    @Override
     public void onSend(long producerTtl) throws JMSException {
         super.onSend(producerTtl);
         delegate.onSend();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
index d63d4a3..64f1fbd 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsStreamMessageFacade.java
@@ -161,6 +161,11 @@ public class AmqpJmsStreamMessageFacade extends AmqpJmsMessageFacade implements
         position = 0;
     }
 
+    @Override
+    public boolean hasBody() {
+        return !list.isEmpty();
+    }
+
     private List<Object> initializeEmptyBodyList(boolean useSequenceBody) {
         List<Object> emptyList = new ArrayList<Object>();
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
index e3106e6..44ed9e6 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsTextMessageFacade.java
@@ -135,6 +135,15 @@ public class AmqpJmsTextMessageFacade extends AmqpJmsMessageFacade implements Jm
         setText(null);
     }
 
+    @Override
+    public boolean hasBody() {
+        try {
+            return getText() != null;
+        } catch (JMSException e) {
+            return false;
+        }
+    }
+
     Charset getCharset() {
         return charset;
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
index 7657343..3c1fa12 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpObjectTypeDelegate.java
@@ -64,4 +64,7 @@ public interface AmqpObjectTypeDelegate {
     void copyInto(AmqpObjectTypeDelegate copy) throws Exception;
 
     boolean isAmqpTypeEncoded();
+
+    boolean hasBody();
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
index 618d123..ec73ba9 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpSerializedObjectDelegate.java
@@ -194,4 +194,13 @@ public class AmqpSerializedObjectDelegate implements AmqpObjectTypeDelegate, Tru
             return true;
         }
     }
+
+    @Override
+    public boolean hasBody() {
+        try {
+            return getObject() != null;
+        } catch (Exception e) {
+            return false;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
index cc1038f..1296eaa 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpTypedObjectDelegate.java
@@ -168,6 +168,15 @@ public class AmqpTypedObjectDelegate implements AmqpObjectTypeDelegate {
         return true;
     }
 
+    @Override
+    public boolean hasBody() {
+        try {
+            return getObject() != null;
+        } catch (Exception e) {
+            return false;
+        }
+    }
+
     //----- Internal implementation ------------------------------------------//
 
     private boolean isSupportedAmqpValueObjectType(Serializable serializable) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
index 3f5b84b..4928faa 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTest.java
@@ -266,8 +266,14 @@ public class JmsConnectionTest {
     }
 
     @Test(timeout=30000, expected=JMSException.class)
-    public void testCreateDurableConnectionConsumer() throws Exception {
+    public void testCreateSharedConnectionConsumer() throws Exception {
         connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
-        connection.createDurableConnectionConsumer(new JmsTopic(), "id", "", null, 1);
+        connection.createSharedConnectionConsumer(new JmsTopic(), "id", "", null, 1);
+    }
+
+    @Test(timeout=30000, expected=JMSException.class)
+    public void testCreateSharedDurableConnectionConsumer() throws Exception {
+        connection = new JmsConnection("ID:TEST:1", provider, clientIdGenerator);
+        connection.createSharedDurableConnectionConsumer(new JmsTopic(), "id", "", null, 1);
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java
index 0f4a9ce..3986f8b 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionTestSupport.java
@@ -18,6 +18,8 @@ package org.apache.qpid.jms;
 
 import java.net.URI;
 
+import javax.jms.JMSContext;
+
 import org.apache.qpid.jms.provider.Provider;
 import org.apache.qpid.jms.provider.ProviderListener;
 import org.apache.qpid.jms.provider.mock.MockProviderFactory;
@@ -51,6 +53,13 @@ public class JmsConnectionTestSupport extends QpidJmsTestCase {
         });
     }
 
+    protected JmsContext createJMSContextToMockProvider() throws Exception {
+        JmsConnection connection = new JmsConnection("ID:TEST:1", createMockProvider(), clientIdGenerator);
+        JmsContext context = new JmsContext(connection, JMSContext.AUTO_ACKNOWLEDGE);
+
+        return context;
+    }
+
     protected JmsConnection createConnectionToMockProvider() throws Exception {
         return new JmsConnection("ID:TEST:1", createMockProvider(), clientIdGenerator);
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
index a3a6ce2..2e559ac 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java
@@ -106,6 +106,30 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testCreateAutoAckSessionByDefault() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            Session session = connection.createSession();
+            assertNotNull("Session should not be null", session);
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCreateAutoAckSessionUsingAckModeOnlyMethod() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            Session session = connection.createSession(Session.AUTO_ACKNOWLEDGE);
+            assertNotNull("Session should not be null", session);
+            testPeer.expectClose();
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testCreateTransactedSession() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);
@@ -132,6 +156,32 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testCreateTransactedSessionUsingAckModeOnlyMethod() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            testPeer.expectBegin();
+            // Expect the session, with an immediate link to the transaction coordinator
+            // using a target with the expected capabilities only.
+            CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher();
+            txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN));
+            testPeer.expectSenderAttach(txCoordinatorMatcher, false, false);
+
+            // First expect an unsettled 'declare' transfer to the txn coordinator, and
+            // reply with a declared disposition state containing the txnId.
+            Binary txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4});
+            testPeer.expectDeclare(txnId);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectClose();
+
+            Session session = connection.createSession(Session.SESSION_TRANSACTED);
+            assertNotNull("Session should not be null", session);
+
+            connection.close();
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testCreateTransactedSessionFailsWhenNoDetachResponseSent() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
index c0a3cbc..7c33fc1 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IntegrationTestFixture.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
+import javax.jms.JMSContext;
 import javax.jms.JMSException;
 
 import org.apache.qpid.jms.JmsConnectionFactory;
@@ -61,22 +62,80 @@ public class IntegrationTestFixture {
         // Each connection creates a session for managing temporary destinations etc
         testPeer.expectBegin();
 
-        String scheme = ssl ? "amqps" : "amqp";
-        final String baseURI = scheme + "://localhost:" + testPeer.getServerPort();
-        String remoteURI = baseURI;
-        if (optionsString != null) {
-            remoteURI = baseURI + optionsString;
-        }
+        String remoteURI = buildURI(testPeer, ssl, optionsString);
 
         ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
         Connection connection = factory.createConnection("guest", "guest");
 
-        if(setClientId) {
+        if (setClientId) {
             // Set a clientId to provoke the actual AMQP connection process to occur.
             connection.setClientID("clientName");
         }
 
         assertNull(testPeer.getThrowable());
+
         return connection;
     }
+
+    JMSContext createJMSContext(TestAmqpPeer testPeer) throws JMSException {
+        return createJMSContext(testPeer, null, null, null);
+    }
+
+    JMSContext createJMSContext(TestAmqpPeer testPeer, int sessionMode) throws JMSException {
+        return createJMSContext(testPeer, false, null, null, null, true, sessionMode);
+    }
+
+    JMSContext createJMSContext(TestAmqpPeer testPeer, String optionsString) throws JMSException {
+        return createJMSContext(testPeer, optionsString, null, null);
+    }
+
+    JMSContext createJMSContext(TestAmqpPeer testPeer, Symbol[] serverCapabilities) throws JMSException {
+        return createJMSContext(testPeer, null, serverCapabilities, null);
+    }
+
+    JMSContext createJMSContext(TestAmqpPeer testPeer, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties) throws JMSException {
+        return createJMSContext(testPeer, null, serverCapabilities, serverProperties);
+    }
+
+    JMSContext createJMSContext(TestAmqpPeer testPeer, String optionsString, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties) throws JMSException {
+        return createJMSContext(testPeer, false, optionsString, serverCapabilities, serverProperties, true, JMSContext.AUTO_ACKNOWLEDGE);
+    }
+
+    JMSContext createJMSContext(TestAmqpPeer testPeer, boolean ssl, String optionsString, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties, boolean setClientId) throws JMSException {
+        return createJMSContext(testPeer, false, optionsString, serverCapabilities, serverProperties, setClientId, JMSContext.AUTO_ACKNOWLEDGE);
+    }
+
+    JMSContext createJMSContext(TestAmqpPeer testPeer, boolean ssl, String optionsString, Symbol[] serverCapabilities, Map<Symbol, Object> serverProperties, boolean setClientId, int sessionMode) throws JMSException {
+        Symbol[] desiredCapabilities = new Symbol[] { AmqpSupport.SOLE_CONNECTION_CAPABILITY };
+
+        testPeer.expectSaslPlainConnect("guest", "guest", desiredCapabilities, serverCapabilities, serverProperties);
+
+        // Each connection creates a session for managing temporary destinations etc
+        testPeer.expectBegin();
+
+        String remoteURI = buildURI(testPeer, ssl, optionsString);
+
+        ConnectionFactory factory = new JmsConnectionFactory(remoteURI);
+        JMSContext context = factory.createContext("guest", "guest", sessionMode);
+
+        if (setClientId) {
+            // Set a clientId to provoke the actual AMQP connection process to occur.
+            context.setClientID("clientName");
+        }
+
+        assertNull(testPeer.getThrowable());
+
+        return context;
+    }
+
+    String buildURI(TestAmqpPeer testPeer, boolean ssl, String optionsString) {
+        String scheme = ssl ? "amqps" : "amqp";
+        final String baseURI = scheme + "://localhost:" + testPeer.getServerPort();
+        String remoteURI = baseURI;
+        if (optionsString != null) {
+            remoteURI = baseURI + optionsString;
+        }
+
+        return remoteURI;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSConsumerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSConsumerIntegrationTest.java
new file mode 100644
index 0000000..adc546d
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSConsumerIntegrationTest.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.hamcrest.Matchers.notNullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import java.io.ByteArrayOutputStream;
+import java.io.ObjectOutputStream;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.Map;
+
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSRuntimeException;
+import javax.jms.Message;
+import javax.jms.MessageFormatRuntimeException;
+import javax.jms.MessageListener;
+import javax.jms.Queue;
+
+import org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport;
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.Wait;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.basictypes.AmqpError;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.DataDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedInteger;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JMSConsumerIntegrationTest extends QpidJmsTestCase {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JMSConsumerIntegrationTest.class);
+
+    private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+    @Test(timeout = 20000)
+    public void testCreateConsumer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+
+            Queue queue = context.createQueue("test");
+            JMSConsumer consumer = context.createConsumer(queue);
+            assertNotNull(consumer);
+
+            testPeer.expectEnd();
+            testPeer.expectClose();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testRemotelyCloseJMSConsumer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+
+            testPeer.expectBegin();
+
+            // Create a consumer, then remotely end it afterwards.
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+            testPeer.remotelyDetachLastOpenedLinkOnLastOpenedSession(true, true, AmqpError.RESOURCE_DELETED, "resource closed");
+
+            Queue queue = context.createQueue("myQueue");
+            final JMSConsumer consumer = context.createConsumer(queue);
+
+            // Verify the consumer gets marked closed
+            testPeer.waitForAllHandlersToComplete(1000);
+            assertTrue("JMSConsumer never closed.", Wait.waitFor(new Wait.Condition() {
+                @Override
+                public boolean isSatisified() throws Exception {
+                    try {
+                        consumer.getMessageListener();
+                    } catch (IllegalStateRuntimeException jmsise) {
+                        return true;
+                    }
+                    return false;
+                }
+            }, 10000, 10));
+
+            // Try closing it explicitly, should effectively no-op in client.
+            // The test peer will throw during close if it sends anything.
+            consumer.close();
+
+            testPeer.expectEnd();
+            testPeer.expectClose();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveMessageWithReceiveZeroTimeout() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+
+            testPeer.expectBegin();
+
+            Queue queue = context.createQueue("myQueue");
+
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            JMSConsumer messageConsumer = context.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(0);
+
+            assertNotNull("A message should have been recieved", receivedMessage);
+
+            testPeer.expectEnd();
+            testPeer.expectClose();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testConsumerReceiveNoWaitThrowsIfConnectionLost() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+
+            testPeer.expectBegin();
+
+            Queue queue = context.createQueue("queue");
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow(false, notNullValue(UnsignedInteger.class));
+            testPeer.expectLinkFlow(true, notNullValue(UnsignedInteger.class));
+            testPeer.dropAfterLastHandler();
+
+            final JMSConsumer consumer = context.createConsumer(queue);
+
+            try {
+                consumer.receiveNoWait();
+                fail("An exception should have been thrown");
+            } catch (JMSRuntimeException jmsre) {
+                // Expected
+            }
+        }
+    }
+
+    @Test(timeout=20000)
+    public void testNoReceivedMessagesWhenConnectionNotStarted() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+            context.setAutoStart(false);
+
+            testPeer.expectBegin();
+
+            Queue destination = context.createQueue(getTestName());
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType("content"), 3);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            JMSConsumer consumer = context.createConsumer(destination);
+
+            assertNull(consumer.receive(100));
+
+            context.start();
+
+            assertNotNull(consumer.receive(2000));
+
+            testPeer.expectEnd();
+            testPeer.expectClose();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout=60000)
+    public void testSyncReceiveFailsWhenListenerSet() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+
+            testPeer.expectBegin();
+
+            Queue destination = context.createQueue(getTestName());
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlow();
+
+            JMSConsumer consumer = context.createConsumer(destination);
+
+            consumer.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message m) {
+                    LOG.warn("Async consumer got unexpected Message: {}", m);
+                }
+            });
+
+            try {
+                consumer.receive();
+                fail("Should have thrown an exception.");
+            } catch (JMSRuntimeException ex) {
+            }
+
+            try {
+                consumer.receive(1000);
+                fail("Should have thrown an exception.");
+            } catch (JMSRuntimeException ex) {
+            }
+
+            try {
+                consumer.receiveNoWait();
+                fail("Should have thrown an exception.");
+            } catch (JMSRuntimeException ex) {
+            }
+
+            testPeer.expectEnd();
+            testPeer.expectClose();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(2000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveBodyMapMessage() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+
+            testPeer.expectBegin();
+
+            Queue queue = context.createQueue("myQueue");
+
+            // Prepare an AMQP message for the test peer to send, containing an
+            // AmqpValue section holding a map with entries for each supported type,
+            // and annotated as a JMS map message.
+            String myBoolKey = "myBool";
+            boolean myBool = true;
+            String myByteKey = "myByte";
+            byte myByte = 4;
+            String myBytesKey = "myBytes";
+            byte[] myBytes = myBytesKey.getBytes();
+            String myCharKey = "myChar";
+            char myChar = 'd';
+            String myDoubleKey = "myDouble";
+            double myDouble = 1234567890123456789.1234;
+            String myFloatKey = "myFloat";
+            float myFloat = 1.1F;
+            String myIntKey = "myInt";
+            int myInt = Integer.MAX_VALUE;
+            String myLongKey = "myLong";
+            long myLong = Long.MAX_VALUE;
+            String myShortKey = "myShort";
+            short myShort = 25;
+            String myStringKey = "myString";
+            String myString = myStringKey;
+
+            Map<String, Object> map = new LinkedHashMap<String, Object>();
+            map.put(myBoolKey, myBool);
+            map.put(myByteKey, myByte);
+            map.put(myBytesKey, new Binary(myBytes));// the underlying AMQP message uses Binary rather than byte[] directly.
+            map.put(myCharKey, myChar);
+            map.put(myDoubleKey, myDouble);
+            map.put(myFloatKey, myFloat);
+            map.put(myIntKey, myInt);
+            map.put(myLongKey, myLong);
+            map.put(myShortKey, myShort);
+            map.put(myStringKey, myString);
+
+            MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+            msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_MAP_MESSAGE);
+
+            DescribedType amqpValueSectionContent = new AmqpValueDescribedType(map);
+
+            // receive the message from the test peer
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueSectionContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            JMSConsumer messageConsumer = context.createConsumer(queue);
+            @SuppressWarnings("unchecked")
+            Map<String, Object> receivedMap = messageConsumer.receiveBody(Map.class, 3000);
+
+            // verify the content is as expected
+            assertNotNull("Map was not received", receivedMap);
+
+            assertEquals("Unexpected boolean value", myBool, receivedMap.get(myBoolKey));
+            assertEquals("Unexpected byte value", myByte, receivedMap.get(myByteKey));
+            byte[] readBytes = (byte[]) receivedMap.get(myBytesKey);
+            assertTrue("Read bytes were not as expected: " + Arrays.toString(readBytes), Arrays.equals(myBytes, readBytes));
+            assertEquals("Unexpected char value", myChar, receivedMap.get(myCharKey));
+            assertEquals("Unexpected double value", myDouble, (double) receivedMap.get(myDoubleKey), 0.0);
+            assertEquals("Unexpected float value", myFloat, (float) receivedMap.get(myFloatKey), 0.0);
+            assertEquals("Unexpected int value", myInt, receivedMap.get(myIntKey));
+            assertEquals("Unexpected long value", myLong, receivedMap.get(myLongKey));
+            assertEquals("Unexpected short value", myShort, receivedMap.get(myShortKey));
+            assertEquals("Unexpected UTF value", myString, receivedMap.get(myStringKey));
+
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveBodyTextMessage() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+
+            testPeer.expectBegin();
+
+            final String content = "Message-Content";
+            Queue queue = context.createQueue("myQueue");
+
+            DescribedType amqpValueContent = new AmqpValueDescribedType(content);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            JMSConsumer messageConsumer = context.createConsumer(queue);
+            String received = messageConsumer.receiveBody(String.class, 3000);
+
+            assertNotNull(received);
+            assertEquals(content, received);
+
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveBodyObjectMessage() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+
+            testPeer.expectBegin();
+
+            Queue queue = context.createQueue("myQueue");
+
+            PropertiesDescribedType properties = new PropertiesDescribedType();
+            properties.setContentType(Symbol.valueOf(AmqpMessageSupport.SERIALIZED_JAVA_OBJECT_CONTENT_TYPE));
+
+            String expectedContent = "expectedContent";
+
+            ByteArrayOutputStream baos = new ByteArrayOutputStream();
+            ObjectOutputStream oos = new ObjectOutputStream(baos);
+            oos.writeObject(expectedContent);
+            oos.flush();
+            oos.close();
+            byte[] bytes = baos.toByteArray();
+
+            MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+            msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_OBJECT_MESSAGE);
+
+            DescribedType dataContent = new DataDescribedType(new Binary(bytes));
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, properties, null, dataContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            JMSConsumer messageConsumer = context.createConsumer(queue);
+            String received = messageConsumer.receiveBody(String.class, 3000);
+
+            assertNotNull(received);
+            assertEquals(expectedContent, received);
+
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveBodyBytesMessage() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+
+            testPeer.expectBegin();
+
+            Queue queue = context.createQueue("myQueue");
+
+            PropertiesDescribedType properties = new PropertiesDescribedType();
+            properties.setContentType(Symbol.valueOf(AmqpMessageSupport.OCTET_STREAM_CONTENT_TYPE));
+
+            MessageAnnotationsDescribedType msgAnnotations = null;
+            msgAnnotations = new MessageAnnotationsDescribedType();
+            msgAnnotations.setSymbolKeyedAnnotation(AmqpMessageSupport.JMS_MSG_TYPE, AmqpMessageSupport.JMS_BYTES_MESSAGE);
+
+            final byte[] expectedContent = "expectedContent".getBytes();
+            DescribedType dataContent = new DataDescribedType(new Binary(expectedContent));
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, properties, null, dataContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            JMSConsumer messageConsumer = context.createConsumer(queue);
+            byte[] received = messageConsumer.receiveBody(byte[].class, 3000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            assertNotNull(received);
+            assertTrue(Arrays.equals(expectedContent, received));
+
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveBodyFailsDoesNotAcceptMessageAutoAck() throws Exception {
+        doTestReceiveBodyFailsDoesNotAcceptMessage(JMSContext.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveBodyFailsDoesNotAcceptMessageDupsOk() throws Exception {
+        doTestReceiveBodyFailsDoesNotAcceptMessage(JMSContext.DUPS_OK_ACKNOWLEDGE);
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveBodyFailsDoesNotAcceptMessageClientAck() throws Exception {
+        doTestReceiveBodyFailsDoesNotAcceptMessage(JMSContext.CLIENT_ACKNOWLEDGE);
+    }
+
+    public void doTestReceiveBodyFailsDoesNotAcceptMessage(int sessionMode) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+
+            testPeer.expectBegin();
+
+            final String content = "Message-Content";
+            Queue queue = context.createQueue("myQueue");
+
+            DescribedType amqpValueContent = new AmqpValueDescribedType(content);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueContent);
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            JMSConsumer messageConsumer = context.createConsumer(queue);
+            try {
+                messageConsumer.receiveBody(Boolean.class, 3000);
+                fail("Should not read as Boolean type");
+            } catch (MessageFormatRuntimeException mfre) {
+            }
+
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveBodyFailsThenAcceptsOnSuccessfullyNextCallAutoAck() throws Exception {
+        doTestReceiveBodyFailsDoesNotAcceptMessage(JMSContext.AUTO_ACKNOWLEDGE);
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveBodyFailsThenAcceptsOnSuccessfullyNextCallDupsOk() throws Exception {
+        doTestReceiveBodyFailsDoesNotAcceptMessage(JMSContext.DUPS_OK_ACKNOWLEDGE);
+    }
+
+    @Test(timeout = 20000)
+    public void testReceiveBodyFailsThenGetNullOnNextAttemptClientAck() throws Exception {
+        doTestReceiveBodyFailsDoesNotAcceptMessage(JMSContext.CLIENT_ACKNOWLEDGE);
+    }
+
+    public void doTestReceiveBodyFailsThenCalledWithCorrectType(int sessionMode) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+
+            testPeer.expectBegin();
+
+            final String content = "Message-Content";
+            Queue queue = context.createQueue("myQueue");
+
+            DescribedType amqpValueContent = new AmqpValueDescribedType(content);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueContent);
+
+            JMSConsumer messageConsumer = context.createConsumer(queue);
+            try {
+                messageConsumer.receiveBody(Boolean.class, 3000);
+                fail("Should not read as Boolean type");
+            } catch (MessageFormatRuntimeException mfre) {
+            }
+
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            if (sessionMode == JMSContext.AUTO_ACKNOWLEDGE ||
+                sessionMode == JMSContext.DUPS_OK_ACKNOWLEDGE) {
+
+                testPeer.expectDispositionThatIsAcceptedAndSettled();
+            }
+
+            String received = messageConsumer.receiveBody(String.class, 3000);
+
+            if (sessionMode == JMSContext.AUTO_ACKNOWLEDGE ||
+                sessionMode == JMSContext.DUPS_OK_ACKNOWLEDGE) {
+
+                assertNotNull(received);
+                assertEquals(content, received);
+            } else {
+                assertNull(received);
+            }
+
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSContextIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSContextIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSContextIntegrationTest.java
new file mode 100644
index 0000000..7c3d2d1
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSContextIntegrationTest.java
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.util.UUID;
+
+import javax.jms.JMSContext;
+import javax.jms.JMSProducer;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+public class JMSContextIntegrationTest extends QpidJmsTestCase {
+
+    private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+    private Symbol[] SERVER_ANONYMOUS_RELAY = new Symbol[]{ANONYMOUS_RELAY};
+
+    @Test(timeout = 20000)
+    public void testCreateAndCloseContext() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+            testPeer.expectClose();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCreateContextWithClientId() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer, false, null, null, null, true);
+            testPeer.expectClose();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCreateContextAndSetClientID() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer, false, null, null, null, false);
+            context.setClientID(UUID.randomUUID().toString());
+            testPeer.expectClose();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCreateAutoAckSessionByDefault() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+            assertEquals(JMSContext.AUTO_ACKNOWLEDGE, context.getSessionMode());
+            testPeer.expectBegin();
+            context.createTopic("TopicName");
+            testPeer.expectEnd();
+            testPeer.expectClose();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCreateContextWithTransactedSessionMode() throws Exception {
+        Binary txnId = new Binary(new byte[]{ (byte) 5, (byte) 6, (byte) 7, (byte) 8});
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer, JMSContext.SESSION_TRANSACTED);
+            assertEquals(JMSContext.SESSION_TRANSACTED, context.getSessionMode());
+
+            // Session should be created and a coordinator should be attached since this
+            // should be a TX session, then a new TX is declared, once closed the TX should
+            // be discharged as a roll back.
+            testPeer.expectBegin();
+            testPeer.expectCoordinatorAttach();
+            testPeer.expectDeclare(txnId);
+            testPeer.expectDischarge(txnId, true);
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            context.createTopic("TopicName");
+
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testCreateContextFromContextWithSessionsActive() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer);
+            assertEquals(JMSContext.AUTO_ACKNOWLEDGE, context.getSessionMode());
+            testPeer.expectBegin();
+            context.createTopic("TopicName");
+
+            // Create a second should not create a new session yet, once a new connection is
+            // create on demand then close of the second context should only close the session
+            JMSContext other = context.createContext(JMSContext.CLIENT_ACKNOWLEDGE);
+            assertEquals(JMSContext.CLIENT_ACKNOWLEDGE, other.getSessionMode());
+            testPeer.expectBegin();
+            testPeer.expectEnd();
+            other.createTopic("TopicName");
+            other.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+
+            // Now the connection should close down.
+            testPeer.expectEnd();
+            testPeer.expectClose();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testOnlyOneProducerCreatedInSingleContext() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer, SERVER_ANONYMOUS_RELAY);
+            assertEquals(JMSContext.AUTO_ACKNOWLEDGE, context.getSessionMode());
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            // One producer created should send an attach.
+            JMSProducer producer1 = context.createProducer();
+            assertNotNull(producer1);
+
+            // An additional one should not result in an attach
+            JMSProducer producer2 = context.createProducer();
+            assertNotNull(producer2);
+
+            testPeer.expectEnd();
+            testPeer.expectClose();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testEachContextGetsItsOwnProducer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer, SERVER_ANONYMOUS_RELAY);
+            assertEquals(JMSContext.AUTO_ACKNOWLEDGE, context.getSessionMode());
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            // One producer created should send an attach.
+            JMSProducer producer1 = context.createProducer();
+            assertNotNull(producer1);
+
+            // An additional one should not result in an attach
+            JMSContext other = context.createContext(JMSContext.AUTO_ACKNOWLEDGE);
+            JMSProducer producer2 = other.createProducer();
+            assertNotNull(producer2);
+
+            testPeer.expectEnd();
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            other.close();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSProducerIntegrationTest.java
new file mode 100644
index 0000000..4096c1f
--- /dev/null
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/JMSProducerIntegrationTest.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.integration;
+
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
+import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import javax.jms.JMSContext;
+import javax.jms.JMSProducer;
+import javax.jms.Message;
+import javax.jms.Queue;
+
+import org.apache.qpid.jms.test.QpidJmsTestCase;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.ApplicationPropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+public class JMSProducerIntegrationTest extends QpidJmsTestCase {
+
+    private final IntegrationTestFixture testFixture = new IntegrationTestFixture();
+
+    private Symbol[] SERVER_ANONYMOUS_RELAY = new Symbol[]{ANONYMOUS_RELAY};
+
+    private static final String NULL_STRING_PROP = "nullStringProperty";
+    private static final String NULL_STRING_PROP_VALUE = null;
+    private static final String STRING_PROP = "stringProperty";
+    private static final String STRING_PROP_VALUE = "string";
+    private static final String BOOLEAN_PROP = "booleanProperty";
+    private static final boolean BOOLEAN_PROP_VALUE = true;
+    private static final String BYTE_PROP = "byteProperty";
+    private static final byte   BYTE_PROP_VALUE = (byte)1;
+    private static final String SHORT_PROP = "shortProperty";
+    private static final short  SHORT_PROP_VALUE = (short)1;
+    private static final String INT_PROP = "intProperty";
+    private static final int    INT_PROP_VALUE = Integer.MAX_VALUE;
+    private static final String LONG_PROP = "longProperty";
+    private static final long   LONG_PROP_VALUE = Long.MAX_VALUE;
+    private static final String FLOAT_PROP = "floatProperty";
+    private static final float  FLOAT_PROP_VALUE = Float.MAX_VALUE;
+    private static final String DOUBLE_PROP = "doubleProperty";
+    private static final double DOUBLE_PROP_VALUE = Double.MAX_VALUE;
+
+    @Test(timeout = 20000)
+    public void testCreateProducer() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer, SERVER_ANONYMOUS_RELAY);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            JMSProducer producer = context.createProducer();
+            assertNotNull(producer);
+
+            testPeer.expectEnd();
+            testPeer.expectClose();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJMSProducerHasDefaultConfiguration() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer, SERVER_ANONYMOUS_RELAY);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            JMSProducer producer = context.createProducer();
+            assertNotNull(producer);
+
+            assertEquals(Message.DEFAULT_DELIVERY_DELAY, producer.getDeliveryDelay());
+            assertEquals(Message.DEFAULT_DELIVERY_MODE, producer.getDeliveryMode());
+            assertEquals(Message.DEFAULT_PRIORITY, producer.getPriority());
+            assertEquals(Message.DEFAULT_TIME_TO_LIVE, producer.getTimeToLive());
+
+            testPeer.expectEnd();
+            testPeer.expectClose();
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJMSProducerSetPropertySendsApplicationProperties() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer, SERVER_ANONYMOUS_RELAY);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            String queueName = "myQueue";
+            Queue queue = context.createQueue(queueName);
+            JMSProducer producer = context.createProducer();
+
+            ApplicationPropertiesSectionMatcher appPropsMatcher = new ApplicationPropertiesSectionMatcher(true);
+            appPropsMatcher.withEntry(NULL_STRING_PROP, nullValue());
+            appPropsMatcher.withEntry(STRING_PROP, equalTo(STRING_PROP_VALUE));
+            appPropsMatcher.withEntry(BOOLEAN_PROP, equalTo(BOOLEAN_PROP_VALUE));
+            appPropsMatcher.withEntry(BYTE_PROP, equalTo(BYTE_PROP_VALUE));
+            appPropsMatcher.withEntry(SHORT_PROP, equalTo(SHORT_PROP_VALUE));
+            appPropsMatcher.withEntry(INT_PROP, equalTo(INT_PROP_VALUE));
+            appPropsMatcher.withEntry(LONG_PROP, equalTo(LONG_PROP_VALUE));
+            appPropsMatcher.withEntry(FLOAT_PROP, equalTo(FLOAT_PROP_VALUE));
+            appPropsMatcher.withEntry(DOUBLE_PROP, equalTo(DOUBLE_PROP_VALUE));
+
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(queueName));
+
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propsMatcher);
+            messageMatcher.setApplicationPropertiesMatcher(appPropsMatcher);
+            testPeer.expectTransfer(messageMatcher);
+
+            producer.setProperty(NULL_STRING_PROP, NULL_STRING_PROP_VALUE);
+            producer.setProperty(STRING_PROP, STRING_PROP_VALUE);
+            producer.setProperty(BOOLEAN_PROP, BOOLEAN_PROP_VALUE);
+            producer.setProperty(BYTE_PROP, BYTE_PROP_VALUE);
+            producer.setProperty(SHORT_PROP, SHORT_PROP_VALUE);
+            producer.setProperty(INT_PROP, INT_PROP_VALUE);
+            producer.setProperty(LONG_PROP, LONG_PROP_VALUE);
+            producer.setProperty(FLOAT_PROP, FLOAT_PROP_VALUE);
+            producer.setProperty(DOUBLE_PROP, DOUBLE_PROP_VALUE);
+
+            producer.send(queue, "test");
+
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testJMSProducerPropertyOverridesMessageValue() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            JMSContext context = testFixture.createJMSContext(testPeer, SERVER_ANONYMOUS_RELAY);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            String queueName = "myQueue";
+            Queue queue = context.createQueue(queueName);
+            Message message = context.createMessage();
+            JMSProducer producer = context.createProducer();
+
+            ApplicationPropertiesSectionMatcher appPropsMatcher = new ApplicationPropertiesSectionMatcher(true);
+            appPropsMatcher.withEntry(STRING_PROP, equalTo(STRING_PROP_VALUE));
+
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withTo(equalTo(queueName));
+
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propsMatcher);
+            messageMatcher.setApplicationPropertiesMatcher(appPropsMatcher);
+            testPeer.expectTransfer(messageMatcher);
+
+            message.setStringProperty(STRING_PROP, "ThisShouldNotBeTransmitted");
+            producer.setProperty(STRING_PROP, STRING_PROP_VALUE);
+            producer.send(queue, message);
+
+            testPeer.expectEnd();
+            testPeer.expectClose();
+
+            context.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
index 8141751..10bcffd 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/MapMessageIntegrationTest.java
@@ -20,8 +20,10 @@ package org.apache.qpid.jms.integration;
 
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 
 import java.util.Arrays;
 import java.util.LinkedHashMap;
@@ -31,6 +33,7 @@ import javax.jms.Connection;
 import javax.jms.MapMessage;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageFormatException;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
@@ -116,10 +119,10 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase {
             testPeer.expectReceiverAttach();
             testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueSectionContent);
             testPeer.expectDispositionThatIsAcceptedAndSettled();
+            testPeer.expectClose();
 
             MessageConsumer messageConsumer = session.createConsumer(queue);
             Message receivedMessage = messageConsumer.receive(3000);
-            testPeer.waitForAllHandlersToComplete(3000);
 
             // verify the content is as expected
             assertNotNull("Message was not received", receivedMessage);
@@ -137,6 +140,23 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase {
             assertEquals("Unexpected long value", myLong, receivedMapMessage.getLong(myLongKey));
             assertEquals("Unexpected short value", myShort, receivedMapMessage.getShort(myShortKey));
             assertEquals("Unexpected UTF value", myString, receivedMapMessage.getString(myStringKey));
+
+            assertTrue(receivedMapMessage.isBodyAssignableTo(Map.class));
+            assertTrue(receivedMapMessage.isBodyAssignableTo(Object.class));
+            assertFalse(receivedMapMessage.isBodyAssignableTo(Boolean.class));
+            assertFalse(receivedMapMessage.isBodyAssignableTo(byte[].class));
+
+            assertNotNull(receivedMapMessage.getBody(Object.class));
+            assertNotNull(receivedMapMessage.getBody(Map.class));
+            try {
+                receivedMapMessage.getBody(byte[].class);
+                fail("Cannot read TextMessage with this type.");
+            } catch (MessageFormatException mfe) {
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
         }
     }
 
@@ -219,9 +239,28 @@ public class MapMessageIntegrationTest extends QpidJmsTestCase {
             messageMatcher.setPropertiesMatcher(propertiesMatcher);
             messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(map));
 
-            // send the message
             testPeer.expectTransfer(messageMatcher);
+            testPeer.expectClose();
+
+            // send the message
             producer.send(mapMessage);
+
+            assertTrue(mapMessage.isBodyAssignableTo(Map.class));
+            assertTrue(mapMessage.isBodyAssignableTo(Object.class));
+            assertFalse(mapMessage.isBodyAssignableTo(Boolean.class));
+            assertFalse(mapMessage.isBodyAssignableTo(byte[].class));
+
+            assertNotNull(mapMessage.getBody(Object.class));
+            assertNotNull(mapMessage.getBody(Map.class));
+            try {
+                mapMessage.getBody(byte[].class);
+                fail("Cannot read TextMessage with this type.");
+            } catch (MessageFormatException mfe) {
+            }
+
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(3000);
         }
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org