You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/09/12 20:12:07 UTC

[6/7] qpid-jms git commit: QPIDJMS-207 Adds dependency on JMS 2.0 API and initial implementation.

QPIDJMS-207 Adds dependency on JMS 2.0 API and initial implementation.

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0c39522c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0c39522c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0c39522c

Branch: refs/heads/master
Commit: 0c39522cde1c845d8b57978dddfa931ee440e9e3
Parents: 3a03663
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Sep 12 14:39:42 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Sep 12 14:39:42 2016 -0400

----------------------------------------------------------------------
 pom.xml                                         |   7 +-
 qpid-jms-client/pom.xml                         |   6 +-
 .../apache/qpid/jms/JmsCompletionListener.java  |  47 --
 .../java/org/apache/qpid/jms/JmsConnection.java |  24 +
 .../apache/qpid/jms/JmsConnectionFactory.java   |  34 +-
 .../java/org/apache/qpid/jms/JmsConsumer.java   | 135 +++++
 .../java/org/apache/qpid/jms/JmsContext.java    | 510 +++++++++++++++++
 .../org/apache/qpid/jms/JmsMessageConsumer.java |  53 ++
 .../org/apache/qpid/jms/JmsMessageProducer.java | 106 ++--
 .../java/org/apache/qpid/jms/JmsProducer.java   | 454 +++++++++++++++
 .../java/org/apache/qpid/jms/JmsSession.java    |  80 ++-
 .../jms/exceptions/JmsExceptionSupport.java     |  66 +++
 .../qpid/jms/message/JmsBytesMessage.java       |  15 +
 .../apache/qpid/jms/message/JmsMapMessage.java  |  24 +
 .../org/apache/qpid/jms/message/JmsMessage.java | 111 ++--
 .../message/JmsMessagePropertyIntercepter.java  |  73 +--
 .../jms/message/JmsMessagePropertySupport.java  | 124 ++++
 .../qpid/jms/message/JmsObjectMessage.java      |  19 +
 .../qpid/jms/message/JmsStreamMessage.java      |   5 +
 .../apache/qpid/jms/message/JmsTextMessage.java |  11 +
 .../message/facade/JmsBytesMessageFacade.java   |   6 +
 .../jms/message/facade/JmsMessageFacade.java    |  26 +
 .../amqp/message/AmqpJmsBytesMessageFacade.java |  24 +-
 .../amqp/message/AmqpJmsMapMessageFacade.java   |   5 +
 .../amqp/message/AmqpJmsMessageFacade.java      |  17 +
 .../message/AmqpJmsObjectMessageFacade.java     |   5 +
 .../message/AmqpJmsStreamMessageFacade.java     |   5 +
 .../amqp/message/AmqpJmsTextMessageFacade.java  |   9 +
 .../amqp/message/AmqpObjectTypeDelegate.java    |   3 +
 .../message/AmqpSerializedObjectDelegate.java   |   9 +
 .../amqp/message/AmqpTypedObjectDelegate.java   |   9 +
 .../org/apache/qpid/jms/JmsConnectionTest.java  |  10 +-
 .../qpid/jms/JmsConnectionTestSupport.java      |   9 +
 .../integration/ConnectionIntegrationTest.java  |  50 ++
 .../jms/integration/IntegrationTestFixture.java |  73 ++-
 .../integration/JMSConsumerIntegrationTest.java | 559 +++++++++++++++++++
 .../integration/JMSContextIntegrationTest.java  | 198 +++++++
 .../integration/JMSProducerIntegrationTest.java | 200 +++++++
 .../integration/MapMessageIntegrationTest.java  |  43 +-
 .../jms/integration/MessageIntegrationTest.java |  40 ++
 .../ObjectMessageIntegrationTest.java           | 116 +++-
 .../PresettledProducerIntegrationTest.java      |   4 +-
 .../integration/ProducerIntegrationTest.java    |   4 +-
 .../jms/integration/SessionIntegrationTest.java |  31 +-
 .../StreamMessageIntegrationTest.java           |  63 ++-
 .../integration/TextMessageIntegrationTest.java |  83 ++-
 .../apache/qpid/jms/message/JmsMessageTest.java | 335 +++++++++++
 .../facade/test/JmsTestBytesMessageFacade.java  |  25 +-
 .../facade/test/JmsTestMapMessageFacade.java    |   5 +
 .../facade/test/JmsTestMessageFacade.java       |  16 +
 .../facade/test/JmsTestObjectMessageFacade.java |   5 +
 .../facade/test/JmsTestStreamMessageFacade.java |   5 +
 .../facade/test/JmsTestTextMessageFacade.java   |   5 +
 .../jms/message/foreign/ForeignJmsMessage.java  |  20 +
 .../jms/producer/JmsMessageProducerTest.java    |   4 +-
 .../qpid/jms/producer/JmsProducerTest.java      | 182 ++++++
 .../failover/FailoverIntegrationTest.java       |   4 +-
 .../qpid/jms/JmsMessageIntegrityTest.java       |  28 +
 58 files changed, 3825 insertions(+), 314 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bd7a642..fbb9396 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,7 +46,8 @@
     <proton-version>0.14.0</proton-version>
     <netty-version>4.0.41.Final</netty-version>
     <slf4j-version>1.7.21</slf4j-version>
-    <geronimo-jms-1-1-spec-version>1.1.1</geronimo-jms-1-1-spec-version>
+    <!-- <geronimo-jms-1-1-spec-version>1.1.1</geronimo-jms-1-1-spec-version> -->
+    <geronimo.jms.2.spec.version>1.0-alpha-2</geronimo.jms.2.spec.version>
     <!-- Test Dependency Versions for this Project -->
     <activemq-version>5.14.0</activemq-version>
     <junit-version>4.12</junit-version>
@@ -113,8 +114,8 @@
       </dependency>
       <dependency>
         <groupId>org.apache.geronimo.specs</groupId>
-        <artifactId>geronimo-jms_1.1_spec</artifactId>
-        <version>${geronimo-jms-1-1-spec-version}</version>
+        <artifactId>geronimo-jms_2.0_spec</artifactId>
+        <version>${geronimo.jms.2.spec.version}</version>
       </dependency>
       <dependency>
         <groupId>io.netty</groupId>

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/pom.xml
----------------------------------------------------------------------
diff --git a/qpid-jms-client/pom.xml b/qpid-jms-client/pom.xml
index a5d1dd6..24b647e 100644
--- a/qpid-jms-client/pom.xml
+++ b/qpid-jms-client/pom.xml
@@ -38,8 +38,12 @@
     </dependency>
     <dependency>
       <groupId>org.apache.geronimo.specs</groupId>
-      <artifactId>geronimo-jms_1.1_spec</artifactId>
+      <artifactId>geronimo-jms_2.0_spec</artifactId>
     </dependency>
+    <!-- <dependency>
+      <groupId>org.apache.geronimo.specs</groupId>
+      <artifactId>geronimo-jms_1.1_spec</artifactId>
+    </dependency> -->
     <dependency>
       <groupId>org.apache.qpid</groupId>
       <artifactId>proton-j</artifactId>

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java
deleted file mode 100644
index 7a6c4d6..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms;
-
-import javax.jms.Message;
-
-/**
- * Interface used to implement listeners for asynchronous {@link javax.jms.Message}
- * sends which will be notified on successful completion of a send or be notified of an
- * error that was encountered while attempting to send a {@link javax.jms.Message}.
- */
-public interface JmsCompletionListener {
-
-    /**
-     * Called when an asynchronous send operation completes successfully.
-     *
-     * @param message
-     *      the {@link javax.jms.Message} that was successfully sent.
-     */
-    void onCompletion(Message message);
-
-    /**
-     * Called when an asynchronous send operation fails to complete, the state
-     * of the send is unknown at this point.
-     *
-     * @param message
-     *      the {@link javax.jms.Message} that was to be sent.
-     * @param exception
-     *      the {@link java.lang.Exception} that describes the send error.
-     */
-    void onException(Message message, Exception exception);
-
-}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index a04d1b3..79fe8d8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -257,6 +257,16 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     }
 
     @Override
+    public Session createSession() throws JMSException {
+        return createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    @Override
+    public Session createSession(int acknowledgeMode) throws JMSException {
+        return createSession(acknowledgeMode == Session.SESSION_TRANSACTED ? true : false, acknowledgeMode);
+    }
+
+    @Override
     public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
         checkClosedOrFailed();
         connect();
@@ -347,6 +357,20 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
     }
 
     @Override
+    public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+        throw new JMSException("Not supported");
+    }
+
+    @Override
+    public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+        checkClosedOrFailed();
+        connect();
+        throw new JMSException("Not supported");
+    }
+
+    @Override
     public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
                                                               String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
         checkClosedOrFailed();

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
index 217749c..4105842 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
@@ -25,6 +25,7 @@ import java.util.Map;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.ExceptionListener;
+import javax.jms.JMSContext;
 import javax.jms.JMSException;
 import javax.jms.QueueConnection;
 import javax.jms.QueueConnectionFactory;
@@ -234,6 +235,35 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
         }
     }
 
+    //----- JMSContext Creation methods --------------------------------------//
+
+    @Override
+    public JMSContext createContext() {
+        return createContext(getUsername(), getPassword(), JMSContext.AUTO_ACKNOWLEDGE);
+    }
+
+    @Override
+    public JMSContext createContext(int sessionMode) {
+        return createContext(getUsername(), getPassword(), sessionMode);
+    }
+
+    @Override
+    public JMSContext createContext(String username, String password) {
+        return createContext(username, password, JMSContext.AUTO_ACKNOWLEDGE);
+    }
+
+    @Override
+    public JMSContext createContext(String username, String password, int sessionMode) {
+        try {
+            JmsConnection connection = (JmsConnection) createConnection(username, password);
+            return new JmsContext(connection, sessionMode);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    //----- Internal Support Methods -----------------------------------------//
+
     protected Provider createProvider(URI remoteURI) throws Exception {
         if (remoteURI == null) {
             remoteURI = new URI(getDefaultRemoteAddress());
@@ -279,9 +309,7 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
         this.connectionIdGenerator = connectionIdGenerator;
     }
 
-    //////////////////////////////////////////////////////////////////////////
-    // Property getters and setters
-    //////////////////////////////////////////////////////////////////////////
+    //----- Property Access Methods ------------------------------------------//
 
     /**
      * @return the remoteURI

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConsumer.java
new file mode 100644
index 0000000..19691e7
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConsumer.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.JMSConsumer;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+
+@SuppressWarnings("unused")
+public class JmsConsumer implements JMSConsumer, AutoCloseable {
+
+    private final JmsSession session;
+    private final JmsMessageConsumer consumer;
+
+    public JmsConsumer(JmsSession session, JmsMessageConsumer consumer) {
+        this.session = session;
+        this.consumer = consumer;
+    }
+
+    @Override
+    public void close() {
+        try {
+            consumer.close();
+        } catch (JMSException e) {
+            throw JmsExceptionSupport.createRuntimeException(e);
+        }
+    }
+
+    //----- MessageConsumer Property Methods ---------------------------------//
+
+    @Override
+    public MessageListener getMessageListener() {
+        try {
+            return consumer.getMessageListener();
+        } catch (JMSException e) {
+            throw JmsExceptionSupport.createRuntimeException(e);
+        }
+    }
+
+    @Override
+    public String getMessageSelector() {
+        try {
+            return consumer.getMessageSelector();
+        } catch (JMSException e) {
+            throw JmsExceptionSupport.createRuntimeException(e);
+        }
+    }
+
+    @Override
+    public void setMessageListener(MessageListener listener) {
+        try {
+            consumer.setMessageListener(listener);
+        } catch (JMSException e) {
+            throw JmsExceptionSupport.createRuntimeException(e);
+        }
+    }
+
+    //----- Receive Methods --------------------------------------------------//
+
+    @Override
+    public Message receive() {
+        try {
+            return consumer.receive();
+        } catch (JMSException e) {
+            throw JmsExceptionSupport.createRuntimeException(e);
+        }
+    }
+
+    @Override
+    public Message receive(long timeout) {
+        try {
+            return consumer.receive(timeout);
+        } catch (JMSException e) {
+            throw JmsExceptionSupport.createRuntimeException(e);
+        }
+    }
+
+    @Override
+    public Message receiveNoWait() {
+        try {
+            return consumer.receiveNoWait();
+        } catch (JMSException e) {
+            throw JmsExceptionSupport.createRuntimeException(e);
+        }
+    }
+
+    @Override
+    public <T> T receiveBody(Class<T> desired) {
+        try {
+            return consumer.receiveBody(desired, -1);
+        } catch (JMSException e) {
+            throw JmsExceptionSupport.createRuntimeException(e);
+        }
+    }
+
+    @Override
+    public <T> T receiveBody(Class<T> desired, long timeout) {
+        try {
+            // Configure for infinite wait when timeout is zero (JMS Spec)
+            if (timeout == 0) {
+                timeout = -1;
+            }
+
+            return consumer.receiveBody(desired, timeout);
+        } catch (JMSException e) {
+            throw JmsExceptionSupport.createRuntimeException(e);
+        }
+    }
+
+    @Override
+    public <T> T receiveBodyNoWait(Class<T> desired) {
+        try {
+            return consumer.receiveBody(desired, 0);
+        } catch (JMSException e) {
+            throw JmsExceptionSupport.createRuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsContext.java
new file mode 100644
index 0000000..008da24
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsContext.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.JMSRuntimeException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+
+public class JmsContext implements JMSContext, AutoCloseable {
+
+    private final JmsConnection connection;
+    private final AtomicLong connectionRefCount;
+    private final int sessionMode;
+
+    private JmsSession session;
+    private JmsMessageProducer sharedProducer;
+    private boolean autoStart = true;
+
+    public JmsContext(JmsConnection connection, int sessionMode) {
+        this(connection, sessionMode, new AtomicLong(1));
+    }
+
+    private JmsContext(JmsConnection connection, int sessionMode, AtomicLong connectionRefCount) {
+        this.connection = connection;
+        this.sessionMode = sessionMode;
+        this.connectionRefCount = connectionRefCount;
+    }
+
+    @Override
+    public void start() {
+        try {
+            connection.start();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public void stop() {
+        try {
+            connection.stop();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public void close() {
+        JMSRuntimeException failure = null;
+
+        synchronized (this) {
+            try {
+                if (session != null) {
+                    session.close();
+                }
+            } catch (JMSException jmse) {
+                failure = JmsExceptionSupport.createRuntimeException(jmse);
+            }
+
+            if (connectionRefCount.decrementAndGet() == 0) {
+                try {
+                    connection.close();
+                } catch (JMSException jmse) {
+                    failure = JmsExceptionSupport.createRuntimeException(jmse);
+                }
+            }
+        }
+
+        if (failure != null) {
+            throw failure;
+        }
+    }
+
+    //----- Session state management -----------------------------------------//
+
+    @Override
+    public void acknowledge() {
+        if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
+            try {
+                getSession().acknowledge(ACK_TYPE.ACCEPTED);
+            } catch (JMSException jmse) {
+                throw JmsExceptionSupport.createRuntimeException(jmse);
+            }
+        }
+    }
+
+    @Override
+    public void commit() {
+        try {
+            getSession().commit();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public void rollback() {
+        try {
+            getSession().rollback();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public void recover() {
+        try {
+            getSession().recover();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public void unsubscribe(String name) {
+        try {
+            getSession().unsubscribe(name);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    //----- Message Factory methods ------------------------------------------//
+
+    @Override
+    public BytesMessage createBytesMessage() {
+        try {
+            return getSession().createBytesMessage();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public MapMessage createMapMessage() {
+        try {
+            return getSession().createMapMessage();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public Message createMessage() {
+        try {
+            return getSession().createMessage();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage() {
+        try {
+            return getSession().createObjectMessage();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public ObjectMessage createObjectMessage(Serializable object) {
+        try {
+            return getSession().createObjectMessage(object);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public StreamMessage createStreamMessage() {
+        try {
+            return getSession().createStreamMessage();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public TextMessage createTextMessage() {
+        try {
+            return getSession().createTextMessage();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public TextMessage createTextMessage(String text) {
+        try {
+            return getSession().createTextMessage(text);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    //----- Destination Creation ---------------------------------------------//
+
+    @Override
+    public Queue createQueue(String queueName) {
+        try {
+            return getSession().createQueue(queueName);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public Topic createTopic(String topicName) {
+        try {
+            return getSession().createTopic(topicName);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public TemporaryQueue createTemporaryQueue() {
+        try {
+            return getSession().createTemporaryQueue();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public TemporaryTopic createTemporaryTopic() {
+        try {
+            return getSession().createTemporaryTopic();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    //----- JMSContext factory methods --------------------------------------//
+
+    @Override
+    public JMSContext createContext(int sessionMode) {
+        synchronized (this) {
+            if (connectionRefCount.get() == 0) {
+                throw new IllegalStateRuntimeException("The Connection is closed");
+            }
+
+            connectionRefCount.incrementAndGet();
+
+            return new JmsContext(connection, sessionMode, connectionRefCount);
+        }
+    }
+
+    //----- JMSProducer factory methods --------------------------------------//
+
+    @Override
+    public JMSProducer createProducer() {
+        try {
+            if (sharedProducer == null) {
+                synchronized (this) {
+                    if (sharedProducer == null) {
+                        sharedProducer = (JmsMessageProducer) getSession().createProducer(null);
+                    }
+                }
+            }
+
+            return new JmsProducer(getSession(), sharedProducer);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    //----- JMSConsumer factory methods --------------------------------------//
+
+    @Override
+    public JMSConsumer createConsumer(Destination destination) {
+        try {
+            return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createConsumer(destination)));
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public JMSConsumer createConsumer(Destination destination, String selector) {
+        try {
+            return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createConsumer(destination, selector)));
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public JMSConsumer createConsumer(Destination destination, String selector, boolean noLocal) {
+        try {
+            return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createConsumer(destination, selector, noLocal)));
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public JMSConsumer createDurableConsumer(Topic topic, String name) {
+        try {
+            return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createDurableConsumer(topic, name)));
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public JMSConsumer createDurableConsumer(Topic topic, String name, String selector, boolean noLocal) {
+        try {
+            return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createDurableConsumer(topic, name, selector, noLocal)));
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public JMSConsumer createSharedConsumer(Topic topic, String name) {
+        try {
+            return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createSharedConsumer(topic, name)));
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public JMSConsumer createSharedConsumer(Topic topic, String name, String selector) {
+        try {
+            return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createSharedConsumer(topic, name, selector)));
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public JMSConsumer createSharedDurableConsumer(Topic topic, String name) {
+        try {
+            return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createSharedDurableConsumer(topic, name)));
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public JMSConsumer createSharedDurableConsumer(Topic topic, String name, String selector) {
+        try {
+            return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createSharedDurableConsumer(topic, name, selector)));
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    //----- QueueBrowser Factory Methods -------------------------------------//
+
+    @Override
+    public QueueBrowser createBrowser(Queue queue) {
+        try {
+            return startIfNeeded(getSession().createBrowser(queue));
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public QueueBrowser createBrowser(Queue queue, String selector) {
+        try {
+            return startIfNeeded(getSession().createBrowser(queue, selector));
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    //----- Get or Set Context and Session values ----------------------------//
+
+    @Override
+    public boolean getAutoStart() {
+        return autoStart;
+    }
+
+    @Override
+    public void setAutoStart(boolean autoStart) {
+        this.autoStart = autoStart;
+    }
+
+    @Override
+    public String getClientID() {
+        try {
+            return connection.getClientID();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public void setClientID(String clientID) {
+        try {
+            connection.setClientID(clientID);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public ExceptionListener getExceptionListener() {
+        try {
+            return connection.getExceptionListener();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public void setExceptionListener(ExceptionListener listener) {
+        try {
+            connection.setExceptionListener(listener);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public ConnectionMetaData getMetaData() {
+        try {
+            return connection.getMetaData();
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public int getSessionMode() {
+        return sessionMode;
+    }
+
+    @Override
+    public boolean getTransacted() {
+        return sessionMode == JMSContext.SESSION_TRANSACTED;
+    }
+
+    //----- Internal implementation methods ----------------------------------//
+
+    private JmsSession getSession() {
+        if (session == null) {
+            synchronized (this) {
+                if (session == null) {
+                    try {
+                        session = (JmsSession) connection.createSession(getSessionMode());
+                    } catch (JMSException jmse) {
+                        throw JmsExceptionSupport.createRuntimeException(jmse);
+                    }
+                }
+            }
+        }
+
+        return session;
+    }
+
+    private QueueBrowser startIfNeeded(QueueBrowser browser) throws JMSException {
+        if (getAutoStart()) {
+            connection.start();
+        }
+
+        return browser;
+    }
+
+    private JmsConsumer startIfNeeded(JmsConsumer consumer) throws JMSException {
+        if (getAutoStart()) {
+            connection.start();
+        }
+
+        return consumer;
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 18fd764..ea93019 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -26,6 +26,7 @@ import javax.jms.IllegalStateException;
 import javax.jms.JMSException;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageFormatException;
 import javax.jms.MessageListener;
 import javax.jms.Session;
 
@@ -213,6 +214,58 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
     }
 
     /**
+     * Reads the next available message for this consumer and returns the body of that message
+     * if the type requested matches that of the message.  The amount of time this method blocks
+     * is based on the timeout value.
+     *
+     *   {@literal timeout < 0} then it blocks until a message is received.
+     *   {@literal timeout = 0} then it returns the body immediately or null if none available.
+     *   {@literal timeout > 0} then it blocks up to timeout amount of time.
+     *
+     * @param desired
+     *      The type to assign the body of the message to for return.
+     * @param timeout
+     *      The time to wait for an incoming message before this method returns null.
+     *
+     * @return the assigned body of the next available message or null if the consumer is closed
+     *         or the specified timeout elapses.
+     *
+     * @throws MessageFormatException if the message body cannot be assigned to the requested type.
+     * @throws JMSException if an error occurs while receiving the next message.
+     */
+    public <T> T receiveBody(Class<T> desired, long timeout) throws JMSException {
+        checkClosed();
+        checkMessageListener();
+
+        T messageBody = null;
+        JmsInboundMessageDispatch envelope = null;
+
+        try {
+            envelope = dequeue(timeout, connection.isReceiveLocalOnly());
+            if (envelope != null) {
+                messageBody = envelope.getMessage().getBody(desired);
+            }
+        } catch (MessageFormatException mfe) {
+            // Should behave as if receiveBody never happened in these modes.
+            if (acknowledgementMode == Session.AUTO_ACKNOWLEDGE ||
+                acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE) {
+
+                envelope.setEnqueueFirst(true);
+                onInboundMessage(envelope);
+                envelope = null;
+            }
+
+            throw mfe;
+        } finally {
+            if (envelope != null) {
+                ackFromReceive(envelope);
+            }
+        }
+
+        return messageBody;
+    }
+
+    /**
      * Used to get an enqueued message from the unconsumedMessages list. The
      * amount of time this method blocks is based on the timeout value.
      *

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index 65812b7..6e9c96d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 
+import javax.jms.CompletionListener;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
@@ -43,6 +44,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
     protected final JmsConnection connection;
     protected JmsProducerInfo producerInfo;
     protected final boolean anonymousProducer;
+    protected long deliveryDelay = Message.DEFAULT_DELIVERY_DELAY;
     protected int deliveryMode = DeliveryMode.PERSISTENT;
     protected int priority = Message.DEFAULT_PRIORITY;
     protected long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
@@ -110,44 +112,50 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
     }
 
     @Override
+    public long getDeliveryDelay() throws JMSException {
+        checkClosed();
+        return deliveryMode;
+    }
+
+    @Override
     public int getDeliveryMode() throws JMSException {
         checkClosed();
-        return this.deliveryMode;
+        return deliveryMode;
     }
 
     @Override
     public Destination getDestination() throws JMSException {
         checkClosed();
-        return this.producerInfo.getDestination();
+        return producerInfo.getDestination();
     }
 
     @Override
     public boolean getDisableMessageID() throws JMSException {
         checkClosed();
-        return this.disableMessageId;
+        return disableMessageId;
     }
 
     @Override
     public boolean getDisableMessageTimestamp() throws JMSException {
         checkClosed();
-        return this.disableTimestamp;
+        return disableTimestamp;
     }
 
     @Override
     public int getPriority() throws JMSException {
         checkClosed();
-        return this.priority;
+        return priority;
     }
 
     @Override
     public long getTimeToLive() throws JMSException {
         checkClosed();
-        return this.timeToLive;
+        return timeToLive;
     }
 
     @Override
     public void send(Message message) throws JMSException {
-        send(message, this.deliveryMode, this.priority, this.timeToLive);
+        send(message, deliveryMode, priority, timeToLive);
     }
 
     @Override
@@ -163,7 +171,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
 
     @Override
     public void send(Destination destination, Message message) throws JMSException {
-        send(destination, message, this.deliveryMode, this.priority, this.timeToLive);
+        send(destination, message, deliveryMode, priority, timeToLive);
     }
 
     @Override
@@ -177,37 +185,13 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
         sendMessage(destination, message, deliveryMode, priority, timeToLive, null);
     }
 
-    /**
-     * Sends the message asynchronously and notifies the assigned listener on success or failure
-     *
-     * @param message
-     *      the {@link javax.jms.Message} to send.
-     * @param listener
-     *      the {@link JmsCompletionListener} to notify on send success or failure.
-     *
-     * @throws JMSException if an error occurs while attempting to send the Message.
-     */
-    public void send(Message message, JmsCompletionListener listener) throws JMSException {
-        send(message, this.deliveryMode, this.priority, this.timeToLive, listener);
+    @Override
+    public void send(Message message, CompletionListener listener) throws JMSException {
+        send(message, deliveryMode, priority, timeToLive, listener);
     }
 
-    /**
-     * Sends the message asynchronously and notifies the assigned listener on success or failure
-     *
-     * @param message
-     *      the {@link javax.jms.Message} to send.
-     * @param deliveryMode
-     *      the delivery mode to assign to the outbound Message.
-     * @param priority
-     *      the priority to assign to the outbound Message.
-     * @param timeToLive
-     *      the time to live value to assign to the outbound Message.
-     * @param listener
-     *      the {@link JmsCompletionListener} to notify on send success or failure.
-     *
-     * @throws JMSException if an error occurs while attempting to send the Message.
-     */
-    public void send(Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException {
+    @Override
+    public void send(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException {
         checkClosed();
 
         if (anonymousProducer) {
@@ -221,41 +205,13 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
         sendMessage(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive, listener);
     }
 
-    /**
-     * Sends the message asynchronously and notifies the assigned listener on success or failure
-     *
-     * @param destination
-     *      the Destination to send the given Message to.
-     * @param message
-     *      the {@link javax.jms.Message} to send.
-     * @param listener
-     *      the {@link JmsCompletionListener} to notify on send success or failure.
-     *
-     * @throws JMSException if an error occurs while attempting to send the Message.
-     */
-    public void send(Destination destination, Message message, JmsCompletionListener listener) throws JMSException {
+    @Override
+    public void send(Destination destination, Message message, CompletionListener listener) throws JMSException {
         send(destination, message, this.deliveryMode, this.priority, this.timeToLive, listener);
     }
 
-    /**
-     * Sends the message asynchronously and notifies the assigned listener on success or failure
-     *
-     * @param destination
-     *      the Destination to send the given Message to.
-     * @param message
-     *      the {@link javax.jms.Message} to send.
-     * @param deliveryMode
-     *      the delivery mode to assign to the outbound Message.
-     * @param priority
-     *      the priority to assign to the outbound Message.
-     * @param timeToLive
-     *      the time to live value to assign to the outbound Message.
-     * @param listener
-     *      the {@link JmsCompletionListener} to notify on send success or failure.
-     *
-     * @throws JMSException if an error occurs while attempting to send the Message.
-     */
-    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException {
+    @Override
+    public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException {
         checkClosed();
 
         if (!anonymousProducer) {
@@ -269,12 +225,18 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
         sendMessage(destination, message, deliveryMode, priority, timeToLive, listener);
     }
 
-    private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException {
+    private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException {
         if (destination == null) {
             throw new InvalidDestinationException("Don't understand null destinations");
         }
 
-        this.session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, listener);
+        this.session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryDelay, listener);
+    }
+
+    @Override
+    public void setDeliveryDelay(long deliveryDelay) throws JMSException {
+        checkClosed();
+        this.deliveryDelay = deliveryDelay;
     }
 
     @Override
@@ -318,7 +280,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
      * @return the next logical sequence for a Message sent from this Producer.
      */
     protected long getNextMessageSequence() {
-        return this.messageSequence.incrementAndGet();
+        return messageSequence.incrementAndGet();
     }
 
     protected void checkClosed() throws IllegalStateException {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsProducer.java
new file mode 100644
index 0000000..a90ec23
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsProducer.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.apache.qpid.jms.message.JmsMessagePropertySupport.checkPropertyNameIsValid;
+import static org.apache.qpid.jms.message.JmsMessagePropertySupport.checkValidObject;
+import static org.apache.qpid.jms.message.JmsMessagePropertySupport.convertPropertyTo;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.BytesMessage;
+import javax.jms.CompletionListener;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsMessageTransformation;
+
+public class JmsProducer implements JMSProducer {
+
+    private final JmsSession session;
+    private final JmsMessageProducer producer;
+
+    private CompletionListener completionListener;
+
+    // Message Headers
+    private String correlationId;
+    private String type;
+    private Destination replyTo;
+    private byte[] correlationIdBytes;
+
+    // Producer send configuration
+    private long deliveryDelay = Message.DEFAULT_DELIVERY_DELAY;
+    private int deliveryMode = DeliveryMode.PERSISTENT;
+    private int priority = Message.DEFAULT_PRIORITY;
+    private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
+    private boolean disableMessageId;
+    private boolean disableTimestamp;
+
+    // Message Properties
+    private final Map<String, Object> messageProperties = new HashMap<String, Object>();
+
+    /**
+     * Create a new JMSProducer instance.
+     *
+     * The producer is backed by the given Session object and uses the shared MessageProducer
+     * instance to send all of its messages.
+     *
+     * @param session
+     *      The Session that created this JMSProducer
+     * @param producer
+     *      The shared MessageProducer owned by the parent Session.
+     */
+    public JmsProducer(JmsSession session, JmsMessageProducer producer) {
+        this.session = session;
+        this.producer = producer;
+    }
+
+    //----- Send Methods -----------------------------------------------------//
+
+    @Override
+    public JMSProducer send(Destination destination, Message message) {
+        try {
+            doSend(destination, message);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+
+        return this;
+    }
+
+    @Override
+    public JMSProducer send(Destination destination, byte[] body) {
+        try {
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(body);
+            doSend(destination, message);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+
+        return this;
+    }
+
+    @Override
+    public JMSProducer send(Destination destination, Map<String, Object> body) {
+        try {
+            MapMessage message = session.createMapMessage();
+            for (Map.Entry<String, Object> entry : body.entrySet()) {
+                message.setObject(entry.getKey(), entry.getValue());
+            }
+
+            doSend(destination, message);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+
+        return this;
+    }
+
+    @Override
+    public JMSProducer send(Destination destination, Serializable body) {
+        try {
+            ObjectMessage message = session.createObjectMessage();
+            message.setObject(body);
+            doSend(destination, message);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+
+        return this;
+    }
+
+    @Override
+    public JMSProducer send(Destination destination, String body) {
+        try {
+            TextMessage message = session.createTextMessage(body);
+            doSend(destination, message);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+
+        return this;
+    }
+
+    private void doSend(Destination destination, Message message) throws JMSException {
+
+        for (Map.Entry<String, Object> entry : messageProperties.entrySet()) {
+            message.setObjectProperty(entry.getKey(), entry.getValue());
+        }
+
+        if (correlationId != null) {
+            message.setJMSCorrelationID(correlationId);
+        }
+        if (correlationIdBytes != null) {
+            message.setJMSCorrelationIDAsBytes(correlationIdBytes);
+        }
+        if (type != null) {
+            message.setJMSType(type);
+        }
+        if (replyTo != null) {
+            message.setJMSReplyTo(replyTo);
+        }
+
+        session.send(producer, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryDelay, completionListener);
+    }
+
+    //----- Message Property Methods -----------------------------------------//
+
+    @Override
+    public JMSProducer clearProperties() {
+        messageProperties.clear();
+        return this;
+    }
+
+    @Override
+    public Set<String> getPropertyNames() {
+        return new HashSet<String>(messageProperties.keySet());
+    }
+
+    @Override
+    public boolean propertyExists(String name) {
+        return messageProperties.containsKey(name);
+    }
+
+    @Override
+    public boolean getBooleanProperty(String name) {
+        try {
+            return convertPropertyTo(name, messageProperties.get(name), Boolean.class);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public byte getByteProperty(String name) {
+        try {
+            return convertPropertyTo(name, messageProperties.get(name), Byte.class);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public double getDoubleProperty(String name) {
+        try {
+            return convertPropertyTo(name, messageProperties.get(name), Double.class);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public float getFloatProperty(String name) {
+        try {
+            return convertPropertyTo(name, messageProperties.get(name), Float.class);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public int getIntProperty(String name) {
+        try {
+            return convertPropertyTo(name, messageProperties.get(name), Integer.class);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public long getLongProperty(String name) {
+        try {
+            return convertPropertyTo(name, messageProperties.get(name), Long.class);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public Object getObjectProperty(String name) {
+        return messageProperties.get(name);
+    }
+
+    @Override
+    public short getShortProperty(String name) {
+        try {
+            return convertPropertyTo(name, messageProperties.get(name), Short.class);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public String getStringProperty(String name) {
+        try {
+            return convertPropertyTo(name, messageProperties.get(name), String.class);
+        } catch (JMSException jmse) {
+            throw JmsExceptionSupport.createRuntimeException(jmse);
+        }
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, boolean value) {
+        return setObjectProperty(name, value);
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, byte value) {
+        return setObjectProperty(name, value);
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, double value) {
+        return setObjectProperty(name, value);
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, float value) {
+        return setObjectProperty(name, value);
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, int value) {
+        return setObjectProperty(name, value);
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, long value) {
+        return setObjectProperty(name, value);
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, Object value) {
+        return setObjectProperty(name, value);
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, short value) {
+        return setObjectProperty(name, value);
+    }
+
+    @Override
+    public JMSProducer setProperty(String name, String value) {
+        return setObjectProperty(name, value);
+    }
+
+    //----- Message Headers --------------------------------------------------//
+
+    @Override
+    public String getJMSCorrelationID() {
+        return correlationId;
+    }
+
+    @Override
+    public JMSProducer setJMSCorrelationID(String correlationId) {
+        this.correlationId = correlationId;
+        return this;
+    }
+
+    @Override
+    public byte[] getJMSCorrelationIDAsBytes() {
+        return correlationIdBytes;
+    }
+
+    @Override
+    public JMSProducer setJMSCorrelationIDAsBytes(byte[] correlationIdBytes) {
+        this.correlationIdBytes = correlationIdBytes;
+        return this;
+    }
+
+    @Override
+    public Destination getJMSReplyTo() {
+        return replyTo;
+    }
+
+    @Override
+    public JMSProducer setJMSReplyTo(Destination replyTo) {
+        try {
+            JmsMessageTransformation.transformDestination(session.getConnection(), replyTo);
+        } catch (JMSException e) {
+            throw JmsExceptionSupport.createRuntimeException(e);
+        }
+
+        return this;
+    }
+
+    @Override
+    public String getJMSType() {
+        return type;
+    }
+
+    @Override
+    public JMSProducer setJMSType(String type) {
+        this.type = type;
+        return this;
+    }
+
+    //----- Producer Send Configuration --------------------------------------//
+
+    @Override
+    public CompletionListener getAsync() {
+        return completionListener;
+    }
+
+    @Override
+    public JMSProducer setAsync(CompletionListener completionListener) {
+        this.completionListener = completionListener;
+        return this;
+    }
+
+    @Override
+    public long getDeliveryDelay() {
+        return deliveryDelay;
+    }
+
+    @Override
+    public JMSProducer setDeliveryDelay(long deliveryDelay) {
+        this.deliveryDelay = deliveryDelay;
+        return this;
+    }
+
+    @Override
+    public int getDeliveryMode() {
+        return deliveryMode;
+    }
+
+    @Override
+    public JMSProducer setDeliveryMode(int deliveryMode) {
+        this.deliveryMode = deliveryMode;
+        return this;
+    }
+
+    @Override
+    public boolean getDisableMessageID() {
+        return disableMessageId;
+    }
+
+    @Override
+    public JMSProducer setDisableMessageID(boolean disableMessageId) {
+        this.disableMessageId = disableMessageId;
+        return this;
+    }
+
+    @Override
+    public boolean getDisableMessageTimestamp() {
+        return disableTimestamp;
+    }
+
+    @Override
+    public JMSProducer setDisableMessageTimestamp(boolean disableTimestamp) {
+        this.disableTimestamp = disableTimestamp;
+        return this;
+    }
+
+    @Override
+    public int getPriority() {
+        return priority;
+    }
+
+    @Override
+    public JMSProducer setPriority(int priority) {
+        this.priority = priority;
+        return this;
+    }
+
+    @Override
+    public long getTimeToLive() {
+        return timeToLive;
+    }
+
+    @Override
+    public JMSProducer setTimeToLive(long timeToLive) {
+        this.timeToLive = timeToLive;
+        return this;
+    }
+
+    //----- Internal support methods -----------------------------------------//
+
+    private JMSProducer setObjectProperty(String name, Object value) {
+        try {
+            checkPropertyNameIsValid(name, session.getConnection().isValidatePropertyNames());
+            checkValidObject(value);
+            messageProperties.put(name, value);
+            return this;
+        } catch (JMSException e) {
+            throw JmsExceptionSupport.createRuntimeException(e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 817f342..e740ba7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
 import java.util.concurrent.locks.ReentrantLock;
 
 import javax.jms.BytesMessage;
+import javax.jms.CompletionListener;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.IllegalStateException;
@@ -471,6 +472,22 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         return result;
     }
 
+    /**
+     * @see javax.jms.Session#createDurableConsumer(javax.jms.Topic, java.lang.String)
+     */
+    @Override
+    public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
+        return createDurableSubscriber(topic, name, null, false);
+    }
+
+    /**
+     * @see javax.jms.Session#createDurableConsumer(javax.jms.Topic, java.lang.String, java.lang.String, boolean)
+     */
+    @Override
+    public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
+        return createDurableSubscriber(topic, name, messageSelector, noLocal);
+    }
+
     protected void checkClientIDWasSetExplicitly() throws IllegalStateException {
         if (!connection.isExplicitClientID()) {
             throw new IllegalStateException("You must specify a unique clientID for the Connection to use a DurableSubscriber");
@@ -486,6 +503,46 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         connection.unsubscribe(name);
     }
 
+    /**
+     * @see javax.jms.Session#createSharedConsumer(javax.jms.Topic, java.lang.String)
+     */
+    @Override
+    public MessageConsumer createSharedConsumer(Topic topic, String name) throws JMSException {
+        checkClosed();
+        // TODO Auto-generated method stub
+        throw new JMSException("Not yet implemented");
+    }
+
+    /**
+     * @see javax.jms.Session#createSharedConsumer(javax.jms.Topic, java.lang.String, java.lang.String)
+     */
+    @Override
+    public MessageConsumer createSharedConsumer(Topic topic, String name, String selector) throws JMSException {
+        checkClosed();
+        // TODO Auto-generated method stub
+        throw new JMSException("Not yet implemented");
+    }
+
+    /**
+     * @see javax.jms.Session#createSharedDurableConsumer(javax.jms.Topic, java.lang.String)
+     */
+    @Override
+    public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException {
+        checkClosed();
+        // TODO Auto-generated method stub
+        throw new JMSException("Not yet implemented");
+    }
+
+    /**
+     * @see javax.jms.Session#createSharedDurableConsumer(javax.jms.Topic, java.lang.String, java.lang.String)
+     */
+    @Override
+    public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String selector) throws JMSException {
+        checkClosed();
+        // TODO Auto-generated method stub
+        throw new JMSException("Not yet implemented");
+    }
+
     //////////////////////////////////////////////////////////////////////////
     // Producer creation
     //////////////////////////////////////////////////////////////////////////
@@ -653,17 +710,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         connection.onException(ex);
     }
 
-    protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, JmsCompletionListener listener) throws JMSException {
+    protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener listener) throws JMSException {
         JmsDestination destination = JmsMessageTransformation.transformDestination(connection, dest);
 
         if (destination.isTemporary() && ((JmsTemporaryDestination) destination).isDeleted()) {
             throw new IllegalStateException("Temporary destination has been deleted");
         }
 
-        send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp, listener);
+        send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp, deliveryDelay, listener);
     }
 
-    private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, JmsCompletionListener listener) throws JMSException {
+    private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener listener) throws JMSException {
         sendLock.lock();
         try {
             original.setJMSDeliveryMode(deliveryMode);
@@ -672,7 +729,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
             original.setJMSDestination(destination);
 
             long timeStamp = System.currentTimeMillis();
-            boolean hasTTL = timeToLive > 0;
+            boolean hasTTL = timeToLive > Message.DEFAULT_TIME_TO_LIVE;
+            boolean hasDelay = deliveryDelay > Message.DEFAULT_DELIVERY_DELAY;
 
             if (!disableTimestamp) {
                 original.setJMSTimestamp(timeStamp);
@@ -686,6 +744,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
                 original.setJMSExpiration(0);
             }
 
+            if (hasDelay) {
+                original.setJMSDeliveryTime(timeStamp + timeToLive);
+            } else {
+                original.setJMSDeliveryTime(0);
+            }
+
             boolean isJmsMessage = original instanceof JmsMessage;
 
             long messageSequence = producer.getNextMessageSequence();
@@ -966,6 +1030,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
         return sessionInfo.getId();
     }
 
+    protected int getSessionMode() {
+        return acknowledgementMode;
+    }
+
     protected JmsConsumerId getNextConsumerId() {
         return new JmsConsumerId(sessionInfo.getId(), consumerIdGenerator.incrementAndGet());
     }
@@ -1272,12 +1340,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
     private final class SendCompletion {
 
         private final JmsOutboundMessageDispatch envelope;
-        private final JmsCompletionListener listener;
+        private final CompletionListener listener;
 
         private Exception failureCause;
         private boolean completed;
 
-        public SendCompletion(JmsOutboundMessageDispatch envelope, JmsCompletionListener listener) {
+        public SendCompletion(JmsOutboundMessageDispatch envelope, CompletionListener listener) {
             this.envelope = envelope;
             this.listener = listener;
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
index 48493bd..5f255f2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
@@ -16,9 +16,29 @@
  */
 package org.apache.qpid.jms.exceptions;
 
+import javax.jms.IllegalStateException;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.InvalidClientIDException;
+import javax.jms.InvalidClientIDRuntimeException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidDestinationRuntimeException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.InvalidSelectorRuntimeException;
 import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.JMSSecurityException;
+import javax.jms.JMSSecurityRuntimeException;
 import javax.jms.MessageEOFException;
 import javax.jms.MessageFormatException;
+import javax.jms.MessageFormatRuntimeException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageNotWriteableRuntimeException;
+import javax.jms.ResourceAllocationException;
+import javax.jms.ResourceAllocationRuntimeException;
+import javax.jms.TransactionInProgressException;
+import javax.jms.TransactionInProgressRuntimeException;
+import javax.jms.TransactionRolledBackException;
+import javax.jms.TransactionRolledBackRuntimeException;
 
 /**
  * Exception support class.
@@ -143,4 +163,50 @@ public final class JmsExceptionSupport {
         exception.initCause(cause);
         return exception;
     }
+
+    /**
+     * Creates the proper instance of a JMSRuntimeException based on the type
+     * of JMSException that is passed.
+     *
+     * @param exception
+     *      The JMSException instance to convert to a JMSRuntimeException
+     *
+     * @return a new {@link JMSRuntimeException} instance that reflects the original error.
+     */
+    public static JMSRuntimeException createRuntimeException(Exception exception) {
+        JMSRuntimeException result = null;
+        JMSException source = null;
+
+        if (!(exception instanceof JMSException)) {
+            throw new JMSRuntimeException(exception.getMessage(), null, exception);
+        } else {
+            source = (JMSException) exception;
+        }
+
+        if (source instanceof IllegalStateException) {
+            result = new IllegalStateRuntimeException(source.getMessage(), source.getErrorCode(), source);
+        } else if (source instanceof InvalidClientIDException) {
+            result = new InvalidClientIDRuntimeException(source.getMessage(), source.getErrorCode(), source);
+        } else if (source instanceof InvalidDestinationException) {
+            result = new InvalidDestinationRuntimeException(source.getMessage(), source.getErrorCode(), source);
+        } else if (source instanceof InvalidSelectorException) {
+            result = new InvalidSelectorRuntimeException(source.getMessage(), source.getErrorCode(), source);
+        } else if (source instanceof JMSSecurityException) {
+            result = new JMSSecurityRuntimeException(source.getMessage(), source.getErrorCode(), source);
+        } else if (source instanceof MessageFormatException) {
+            result = new MessageFormatRuntimeException(source.getMessage(), source.getErrorCode(), source);
+        } else if (source instanceof MessageNotWriteableException) {
+            result = new MessageNotWriteableRuntimeException(source.getMessage(), source.getErrorCode(), source);
+        } else if (source instanceof ResourceAllocationException) {
+            result = new ResourceAllocationRuntimeException(source.getMessage(), source.getErrorCode(), source);
+        } else if (source instanceof TransactionInProgressException) {
+            result = new TransactionInProgressRuntimeException(source.getMessage(), source.getErrorCode(), source);
+        } else if (source instanceof TransactionRolledBackException) {
+            result = new TransactionRolledBackRuntimeException(source.getMessage(), source.getErrorCode(), source);
+        } else {
+            result = new JMSRuntimeException(source.getMessage(), source.getErrorCode(), source);
+        }
+
+        return result;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java
index 95e51e0..75bc30c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java
@@ -28,6 +28,7 @@ import javax.jms.MessageFormatException;
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade;
 
+@SuppressWarnings("unchecked")
 public class JmsBytesMessage extends JmsMessage implements BytesMessage {
 
     protected transient DataOutputStream dataOut;
@@ -396,6 +397,20 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
         return "JmsBytesMessage { " + facade + " }";
     }
 
+    @Override
+    public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class target) throws JMSException {
+        return facade.hasBody() ? target.isAssignableFrom(byte[].class) : true;
+    }
+
+    @Override
+    protected <T> T doGetBody(Class<T> asType) throws JMSException {
+        if (!facade.hasBody()) {
+            return null;
+        }
+
+        return (T) facade.copyBody();
+    }
+
     private void initializeWriting() throws JMSException {
         checkReadOnlyBody();
         if (this.dataOut == null) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
index add21f1..4ec02fe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
@@ -17,6 +17,8 @@
 package org.apache.qpid.jms.message;
 
 import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
 
 import javax.jms.JMSException;
 import javax.jms.MapMessage;
@@ -27,6 +29,7 @@ import org.apache.qpid.jms.message.facade.JmsMapMessageFacade;
 /**
  * Implementation of the JMS MapMessage.
  */
+@SuppressWarnings("unchecked")
 public class JmsMapMessage extends JmsMessage implements MapMessage {
 
     JmsMapMessageFacade facade;
@@ -299,6 +302,27 @@ public class JmsMapMessage extends JmsMessage implements MapMessage {
         return "JmsMapMessage { " + facade + " }";
     }
 
+    @Override
+    public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class target) throws JMSException {
+        return facade.hasBody() ? target.isAssignableFrom(Map.class) : true;
+    }
+
+    @Override
+    protected <T> T doGetBody(Class<T> asType) throws JMSException {
+        if (!facade.hasBody()) {
+            return null;
+        }
+
+        Map<String, Object> copy = new HashMap<String, Object>();
+        Enumeration<String> keys = facade.getMapNames();
+        while (keys.hasMoreElements()) {
+            String key = keys.nextElement();
+            copy.put(key, getObject(key));
+        }
+
+        return (T) copy;
+    }
+
     private void put(String name, Object value) throws JMSException {
         checkReadOnlyBody();
         checkKeyNameIsValid(name);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
index 68db061..05143d8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
@@ -16,6 +16,8 @@
  */
 package org.apache.qpid.jms.message;
 
+import static org.apache.qpid.jms.message.JmsMessagePropertySupport.convertPropertyTo;
+
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.HashSet;
@@ -32,7 +34,6 @@ import org.apache.qpid.jms.JmsAcknowledgeCallback;
 import org.apache.qpid.jms.JmsConnection;
 import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
 import org.apache.qpid.jms.message.facade.JmsMessageFacade;
-import org.apache.qpid.jms.util.TypeConversionSupport;
 
 public class JmsMessage implements javax.jms.Message {
 
@@ -103,6 +104,24 @@ public class JmsMessage implements javax.jms.Message {
     }
 
     @Override
+    public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class target) throws JMSException {
+        return true;
+    }
+
+    @Override
+    public final <T> T getBody(Class<T> asType) throws JMSException {
+        if (isBodyAssignableTo(asType)) {
+            return doGetBody(asType);
+        }
+
+        throw new MessageFormatException("Message body cannot be read as type: " + asType);
+    }
+
+    protected <T> T doGetBody(Class<T> asType) throws JMSException {
+        return null;
+    }
+
+    @Override
     public void clearBody() throws JMSException {
         readOnlyBody = false;
         facade.clearBody();
@@ -248,6 +267,16 @@ public class JmsMessage implements javax.jms.Message {
     }
 
     @Override
+    public long getJMSDeliveryTime() throws JMSException {
+        return facade.getDeliveryTime();
+    }
+
+    @Override
+    public void setJMSDeliveryTime(long deliveryTime) throws JMSException {
+        facade.setDeliveryTime(deliveryTime);
+    }
+
+    @Override
     public void clearProperties() throws JMSException {
         JmsMessagePropertyIntercepter.clearProperties(this, true);
     }
@@ -288,106 +317,42 @@ public class JmsMessage implements javax.jms.Message {
 
     @Override
     public boolean getBooleanProperty(String name) throws JMSException {
-        Object value = getObjectProperty(name);
-        if (value == null) {
-            return false;
-        }
-        Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
-        if (rc == null) {
-            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a boolean");
-        }
-        return rc.booleanValue();
+        return convertPropertyTo(name, getObjectProperty(name), Boolean.class);
     }
 
     @Override
     public byte getByteProperty(String name) throws JMSException {
-        Object value = getObjectProperty(name);
-        if (value == null) {
-            throw new NumberFormatException("property " + name + " was null");
-        }
-        Byte rc = (Byte) TypeConversionSupport.convert(value, Byte.class);
-        if (rc == null) {
-            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a byte");
-        }
-        return rc.byteValue();
+        return convertPropertyTo(name, getObjectProperty(name), Byte.class);
     }
 
     @Override
     public short getShortProperty(String name) throws JMSException {
-        Object value = getObjectProperty(name);
-        if (value == null) {
-            throw new NumberFormatException("property " + name + " was null");
-        }
-        Short rc = (Short) TypeConversionSupport.convert(value, Short.class);
-        if (rc == null) {
-            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a short");
-        }
-        return rc.shortValue();
+        return convertPropertyTo(name, getObjectProperty(name), Short.class);
     }
 
     @Override
     public int getIntProperty(String name) throws JMSException {
-        Object value = getObjectProperty(name);
-        if (value == null) {
-            throw new NumberFormatException("property " + name + " was null");
-        }
-        Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
-        if (rc == null) {
-            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as an integer");
-        }
-        return rc.intValue();
+        return convertPropertyTo(name, getObjectProperty(name), Integer.class);
     }
 
     @Override
     public long getLongProperty(String name) throws JMSException {
-        Object value = getObjectProperty(name);
-        if (value == null) {
-            throw new NumberFormatException("property " + name + " was null");
-        }
-        Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
-        if (rc == null) {
-            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a long");
-        }
-        return rc.longValue();
+        return convertPropertyTo(name, getObjectProperty(name), Long.class);
     }
 
     @Override
     public float getFloatProperty(String name) throws JMSException {
-        Object value = getObjectProperty(name);
-        if (value == null) {
-            throw new NullPointerException("property " + name + " was null");
-        }
-        Float rc = (Float) TypeConversionSupport.convert(value, Float.class);
-        if (rc == null) {
-            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a float");
-        }
-        return rc.floatValue();
+        return convertPropertyTo(name, getObjectProperty(name), Float.class);
     }
 
     @Override
     public double getDoubleProperty(String name) throws JMSException {
-        Object value = getObjectProperty(name);
-        if (value == null) {
-            throw new NullPointerException("property " + name + " was null");
-        }
-        Double rc = (Double) TypeConversionSupport.convert(value, Double.class);
-        if (rc == null) {
-            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a double");
-        }
-        return rc.doubleValue();
+        return convertPropertyTo(name, getObjectProperty(name), Double.class);
     }
 
     @Override
     public String getStringProperty(String name) throws JMSException {
-        Object value = getObjectProperty(name);
-        if (value == null) {
-            return null;
-        }
-        String rc = (String) TypeConversionSupport.convert(value, String.class);
-        if (rc == null) {
-            throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a String");
-        }
-        return rc;
+        return convertPropertyTo(name, getObjectProperty(name), String.class);
     }
 
     @Override


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org