You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2013/11/20 15:34:56 UTC

svn commit: r1543828 [1/3] - in /qpid/jms/trunk/src: main/java/org/apache/qpid/jms/engine/ main/java/org/apache/qpid/jms/impl/ test/java/org/apache/qpid/jms/ test/java/org/apache/qpid/jms/engine/ test/java/org/apache/qpid/jms/impl/ test/java/org/apache...

Author: robbie
Date: Wed Nov 20 14:34:55 2013
New Revision: 1543828

URL: http://svn.apache.org/r1543828
Log:
QPIDJMS-9: initial work on Message implementation and mapping to/from AMQP messages

- Add some initial stub Message implementations
- Initial support constructing appropriate Message type instances based on the incoming AMQP payload
- Add abilities to TestAmqpPeer for sending and matching additional message sections such as MessageAnnotations, Properties, and Data.
- Add some integration tests for sending/receiving TextMessage and BytesMessage

Added:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java
      - copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java
      - copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java
      - copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java
      - copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java
      - copied, changed from r1529205, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/engine/AmqpMessageFactoryTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/MessageFactoryImplTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/MapDescribedType.java
      - copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/ListDescribedType.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/AmqpValueDescribedType.java
      - copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/ListDescribedType.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/DataDescribedType.java
      - copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/ListDescribedType.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/HeaderDescribedType.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/MessageAnnotationsDescribedType.java
      - copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/ListDescribedType.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/PropertiesDescribedType.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/describedtypes/sections/generate-list-sections.xsl
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/AbstractMessageSectionMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessageAnnotationsSectionMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessageHeaderSectionMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessageListSectionMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessageMapSectionMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/MessagePropertiesSectionMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/TransferPayloadCompositeMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/sections/generate-message-section-matchers.xsl
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedAmqpTypeMatcher.java
      - copied, changed from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/EncodedAmqpValueMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedAmqpValueMatcher.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/matchers/types/EncodedDataMatcher.java
Removed:
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/EncodedAmqpValueMatcher.java
Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/ReceiverImplTest.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/AbstractFrameFieldAndPayloadMatchingHandler.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/ListDescribedType.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeerRunner.java

Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpBytesMessage.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.engine;
+
+import java.io.ByteArrayInputStream;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+
+public class AmqpBytesMessage extends AmqpMessage
+{
+    public static final String CONTENT_TYPE = "application/octet-stream";
+    private long _length;
+
+    public AmqpBytesMessage()
+    {
+        super();
+        setContentType(CONTENT_TYPE);
+    }
+
+    public AmqpBytesMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+    {
+        super(message, delivery, amqpConnection);
+    }
+
+    public void setBytes(byte[] bytes)
+    {
+        getMessage().setBody(new Data(new Binary(bytes)));
+    }
+
+    public ByteArrayInputStream getByteArrayInputStream()
+    {
+        Section body = getMessage().getBody();
+
+        if(body == null)
+        {
+            _length = 0;
+            return createEmptyByteArrayInputStream();
+        }
+        else if(body instanceof AmqpValue)
+        {
+            Object value = ((AmqpValue) body).getValue();
+
+            if(value == null)
+            {
+                _length = 0;
+                return createEmptyByteArrayInputStream();
+            }
+            if(value instanceof Binary)
+            {
+                Binary b = (Binary)value;
+                _length = b.getLength();
+                return new ByteArrayInputStream(b.getArray(), b.getArrayOffset(), b.getLength());
+            }
+            else
+            {
+                throw new RuntimeException("Unexpected body content type: " + value.getClass().getSimpleName());
+            }
+        }
+        else if(body instanceof Data)
+        {
+            Binary b = ((Data) body).getValue();
+            _length = b.getLength();
+            return new ByteArrayInputStream(b.getArray(), b.getArrayOffset(), b.getLength());
+        }
+        else
+        {
+            throw new RuntimeException("Unexpected message body type: " + body.getClass().getSimpleName());
+        }
+    }
+
+    private ByteArrayInputStream createEmptyByteArrayInputStream()
+    {
+        return new ByteArrayInputStream(new byte[0]);
+    }
+
+    public long getBytesLength()
+    {
+        getByteArrayInputStream();
+        return _length;
+    }
+
+    //TODO: methods to access/set content
+}

Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java (from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java&p1=qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java&r1=1529205&r2=1543828&rev=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpGenericMessage.java Wed Nov 20 14:34:55 2013
@@ -16,27 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.qpid.jms;
+package org.apache.qpid.jms.engine;
 
-import static org.junit.Assert.assertNull;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-
-import org.apache.qpid.jms.impl.ConnectionImpl;
-import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
-
-public class IntegrationTestFixture
+public class AmqpGenericMessage extends AmqpMessage
 {
-    static final int PORT = 25672;
-
-    Connection establishConnecton(TestAmqpPeer testPeer) throws JMSException
+    public AmqpGenericMessage()
     {
-        testPeer.expectPlainConnect("guest", "guest", true);
-
-        Connection connection =  new ConnectionImpl("clientName", "localhost", PORT, "guest", "guest");
+        super();
+    }
 
-        assertNull(testPeer.getException());
-        return connection;
+    public AmqpGenericMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+    {
+        super(message, delivery, amqpConnection);
     }
+
+    //TODO: add some methods to access the body in some fashion?
 }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpLink.java Wed Nov 20 14:34:55 2013
@@ -37,11 +37,11 @@ public abstract class AmqpLink
     private boolean _closed;
 
 
-    public AmqpLink(AmqpSession amqpSession, Link protonLink)
+    public AmqpLink(AmqpSession amqpSession, Link protonLink, AmqpConnection amqpConnection)
     {
         _amqpSession = amqpSession;
         _protonLink = protonLink;
-        _amqpConnection = _amqpSession.getAmqpConnection();
+        _amqpConnection = amqpConnection;
     }
 
     public boolean isEstablished()

Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java (from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java&p1=qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java&r1=1529205&r2=1543828&rev=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpListMessage.java Wed Nov 20 14:34:55 2013
@@ -16,27 +16,22 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.qpid.jms;
+package org.apache.qpid.jms.engine;
 
-import static org.junit.Assert.assertNull;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-
-import org.apache.qpid.jms.impl.ConnectionImpl;
-import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
-
-public class IntegrationTestFixture
+public class AmqpListMessage extends AmqpMessage
 {
-    static final int PORT = 25672;
-
-    Connection establishConnecton(TestAmqpPeer testPeer) throws JMSException
+    public AmqpListMessage()
     {
-        testPeer.expectPlainConnect("guest", "guest", true);
-
-        Connection connection =  new ConnectionImpl("clientName", "localhost", PORT, "guest", "guest");
+        super();
+    }
 
-        assertNull(testPeer.getException());
-        return connection;
+    public AmqpListMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+    {
+        super(message, delivery, amqpConnection);
     }
+
+    //TODO: methods to access/set content
 }

Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java (from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java&p1=qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java&r1=1529205&r2=1543828&rev=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMapMessage.java Wed Nov 20 14:34:55 2013
@@ -16,27 +16,20 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.qpid.jms;
+package org.apache.qpid.jms.engine;
 
-import static org.junit.Assert.assertNull;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-
-import org.apache.qpid.jms.impl.ConnectionImpl;
-import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
-
-public class IntegrationTestFixture
+public class AmqpMapMessage extends AmqpMessage
 {
-    static final int PORT = 25672;
-
-    Connection establishConnecton(TestAmqpPeer testPeer) throws JMSException
+    public AmqpMapMessage()
     {
-        testPeer.expectPlainConnect("guest", "guest", true);
-
-        Connection connection =  new ConnectionImpl("clientName", "localhost", PORT, "guest", "guest");
+        super();
+    }
 
-        assertNull(testPeer.getException());
-        return connection;
+    public AmqpMapMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+    {
+        super(message, delivery, amqpConnection);
     }
 }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessage.java Wed Nov 20 14:34:55 2013
@@ -20,9 +20,12 @@
  */
 package org.apache.qpid.jms.engine;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.messaging.Accepted;
-import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.MessageAnnotations;
 import org.apache.qpid.proton.engine.Delivery;
 import org.apache.qpid.proton.engine.impl.DeliveryImpl;
 import org.apache.qpid.proton.message.Message;
@@ -31,28 +34,40 @@ import org.apache.qpid.proton.message.Me
  * Thread-safe (all state is guarded by the corresponding {@link AmqpConnection} monitor)
  *
  */
-public class AmqpMessage
+public abstract class AmqpMessage
 {
+    public static final String MESSAGE_ANNOTATION_TYPE_KEY_NAME = "x-opt-jms-message-type";
 
-    private final AmqpReceiver _amqpReceiver;
     private final Delivery _delivery;
     private final Message _message;
+    private final AmqpConnection _amqpConnection;
+
+    private volatile MessageAnnotations _messageAnnotations;
+    private volatile Map<Object,Object> _messageAnnotationsMap;
 
-    public AmqpMessage(Delivery delivery, Message message, AmqpReceiver amqpReceiver)
+    /**
+     * Used when creating a message that we intend to send
+     */
+    public AmqpMessage()
     {
-        _delivery = delivery;
-        _amqpReceiver = amqpReceiver;
-        _message = message;
+        this(Proton.message(), null, null);
     }
 
     /**
-     * Currently used when creating a message that we intend to send
+     * Used when creating a message that has been received
      */
-    public AmqpMessage()
+    @SuppressWarnings("unchecked")
+    public AmqpMessage(Message message, Delivery delivery, AmqpConnection amqpConnection)
     {
-        _message = Proton.message();
-        _amqpReceiver = null;
-        _delivery = null;
+        _delivery = delivery;
+        _amqpConnection = amqpConnection;
+        _message = message;
+
+        _messageAnnotations = _message.getMessageAnnotations();
+        if(_messageAnnotations != null)
+        {
+            _messageAnnotationsMap = _messageAnnotations.getValue();
+        }
     }
 
     Message getMessage()
@@ -62,19 +77,19 @@ public class AmqpMessage
 
     public void accept(boolean settle)
     {
-        synchronized (_amqpReceiver.getAmqpConnection())
+        synchronized (_amqpConnection)
         {
             _delivery.disposition(Accepted.getInstance());
             if(settle)
             {
-                _delivery.settle();
+                settle();
             }
         }
     }
 
     public void settle()
     {
-        synchronized (_amqpReceiver.getAmqpConnection())
+        synchronized (_amqpConnection)
         {
             _delivery.settle();
         }
@@ -88,15 +103,78 @@ public class AmqpMessage
      */
     public boolean isSettled()
     {
-        synchronized (_amqpReceiver.getAmqpConnection())
+        synchronized (_amqpConnection)
         {
             return _delivery.isSettled() || ((_delivery instanceof DeliveryImpl && ((DeliveryImpl)_delivery).remotelySettled()));
         }
     }
 
-    public void setText(String string)
+    public void setContentType(String contentType)
+    {
+        //TODO: do we need to synchronise this?
+        _message.setContentType(contentType);
+    }
+
+    public boolean messageAnnotationExists(Object key)
+    {
+        //TODO: this isn't thread-safe, does it need to be?
+        Map<Object,Object> msgAnnotations = _messageAnnotationsMap;
+        if(msgAnnotations == null)
+        {
+            return false;
+        }
+
+        return msgAnnotations.containsKey(key);
+    }
+
+    public void clearMessageAnnotation(Object key)
+    {
+        //TODO: this isnt thread-safe, does it need to be?
+        if(_messageAnnotationsMap == null)
+        {
+            return;
+        }
+
+        _messageAnnotationsMap.remove(key);
+
+        //If there are now no annotations, clear the field on
+        //the Proton message to avoid encoding an empty map
+        if(_messageAnnotationsMap.isEmpty())
+        {
+            clearAllMessageAnnotations();
+        }
+    }
+
+    public void clearAllMessageAnnotations()
+    {
+        //TODO: this isnt thread-safe, does it need to be?
+        _messageAnnotations = null;
+        _message.setMessageAnnotations(null);
+    }
+
+    public void setMessageAnnotation(Object key, Object value)
+    {
+        //TODO: this isnt thread-safe, does it need to be?
+        if(_messageAnnotationsMap == null)
+        {
+            _messageAnnotationsMap = new HashMap<Object,Object>();
+        }
+
+        _messageAnnotationsMap.put(key, value);
+
+        //If there were previously no annotations, we need to
+        //set the related field on the Proton message now
+        if(_messageAnnotations == null)
+        {
+            setMessageAnnotations();
+        }
+    }
+
+    private void setMessageAnnotations()
     {
-        AmqpValue body = new AmqpValue(string);
-        _message.setBody(body);
+        //TODO: this isnt thread-safe, does it need to be?
+        _messageAnnotations = new MessageAnnotations(_messageAnnotationsMap);
+        _message.setMessageAnnotations(_messageAnnotations);
     }
+
 }

Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpMessageFactory.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.engine;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+
+public class AmqpMessageFactory
+{
+    //TODO: use switch statements
+    @SuppressWarnings("unchecked")
+    AmqpMessage createAmqpMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+    {
+        Section body = message.getBody();
+
+        Map<Object,Object> messageAnnotationsMap = null;
+        if(message.getMessageAnnotations() != null)
+        {
+            messageAnnotationsMap = message.getMessageAnnotations().getValue();
+        };
+
+        if(body == null)
+        {
+            if(isJMSMessageType(AmqpTextMessage.MSG_TYPE_ANNOTATION_VALUE, messageAnnotationsMap))
+            {
+                return new AmqpTextMessage(delivery, message, amqpConnection);
+            }
+            else if(isContentType(AmqpTextMessage.CONTENT_TYPE, message))
+            {
+                return new AmqpTextMessage(delivery, message, amqpConnection);
+            }
+            else if(isContentType(AmqpObjectMessage.CONTENT_TYPE, message))
+            {
+                return new AmqpObjectMessage(delivery, message, amqpConnection);
+            }
+            else if(isContentType(AmqpBytesMessage.CONTENT_TYPE, message) || isContentType(null, message))
+            {
+                return new AmqpBytesMessage(delivery, message, amqpConnection);
+            }
+        }
+        else if(body instanceof Data)
+        {
+            if(isContentType(AmqpTextMessage.CONTENT_TYPE, message))
+            {
+                return new AmqpTextMessage(delivery, message, amqpConnection);
+            }
+            else if(isContentType(AmqpBytesMessage.CONTENT_TYPE, message)  || isContentType(null, message))
+            {
+                return new AmqpBytesMessage(delivery, message, amqpConnection);
+            }
+            else if(isContentType(AmqpObjectMessage.CONTENT_TYPE, message))
+            {
+                return new AmqpObjectMessage(delivery, message, amqpConnection);
+            }
+        }
+        else if(body instanceof AmqpValue)
+        {
+            Object value = ((AmqpValue) body).getValue();
+
+            if(value == null)
+            {
+                if(isJMSMessageType(AmqpTextMessage.MSG_TYPE_ANNOTATION_VALUE, messageAnnotationsMap))
+                {
+                    return new AmqpTextMessage(delivery, message, amqpConnection);
+                }
+            }
+            else if(value instanceof String)
+            {
+                return new AmqpTextMessage(delivery, message, amqpConnection);
+            }
+            else if(value instanceof Map)
+            {
+                return new AmqpMapMessage(delivery, message, amqpConnection);
+            }
+            else if(value instanceof List)
+            {
+                return new AmqpListMessage(delivery, message, amqpConnection);
+            }
+            else if(value instanceof Binary)
+            {
+                return new AmqpBytesMessage(delivery, message, amqpConnection);
+            }
+        }
+
+        //Unable to determine a specific message type, return the generic message
+        return new AmqpGenericMessage(delivery, message, amqpConnection);
+    }
+
+    private boolean isJMSMessageType(String messageType, Map<Object, Object> messageAnnotationsMap)
+    {
+        Symbol key = Symbol.valueOf(AmqpMessage.MESSAGE_ANNOTATION_TYPE_KEY_NAME);
+        Object value = getAnnotation(messageAnnotationsMap, key);
+
+        return messageType.equals(value);
+    }
+
+    private Object getAnnotation(Map<Object,Object> annotations, Symbol symbolKey)
+    {
+        if(annotations == null || !annotations.containsKey(symbolKey))
+        {
+            return null;
+        }
+
+        return annotations.get(symbolKey);
+    }
+
+    private boolean isContentType(String contentType, Message message)
+    {
+        if(contentType == null)
+        {
+            return message.getContentType() == null;
+        }
+        else
+        {
+            return contentType.equals(message.getContentType());
+        }
+    }
+
+}

Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java (from r1529205, qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java&p1=qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java&r1=1529205&r2=1543828&rev=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpObjectMessage.java Wed Nov 20 14:34:55 2013
@@ -16,27 +16,24 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-package org.apache.qpid.jms;
+package org.apache.qpid.jms.engine;
 
-import static org.junit.Assert.assertNull;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
 
-import javax.jms.Connection;
-import javax.jms.JMSException;
-
-import org.apache.qpid.jms.impl.ConnectionImpl;
-import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
-
-public class IntegrationTestFixture
+public class AmqpObjectMessage extends AmqpMessage
 {
-    static final int PORT = 25672;
+    public static final String CONTENT_TYPE = "application/x-java-serialized-object";
 
-    Connection establishConnecton(TestAmqpPeer testPeer) throws JMSException
+    public AmqpObjectMessage()
     {
-        testPeer.expectPlainConnect("guest", "guest", true);
-
-        Connection connection =  new ConnectionImpl("clientName", "localhost", PORT, "guest", "guest");
+        super();
+    }
 
-        assertNull(testPeer.getException());
-        return connection;
+    public AmqpObjectMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+    {
+        super(message, delivery, amqpConnection);
     }
+
+    //TODO: methods to access/set content
 }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpReceiver.java Wed Nov 20 14:34:55 2013
@@ -29,9 +29,9 @@ public class AmqpReceiver extends AmqpLi
     private Receiver _protonReceiver;
     private byte[] _buffer = new byte[1024];
 
-    public AmqpReceiver(AmqpSession amqpSession, Receiver protonReceiver)
+    public AmqpReceiver(AmqpSession amqpSession, Receiver protonReceiver, AmqpConnection amqpConnection)
     {
-        super(amqpSession, protonReceiver);
+        super(amqpSession, protonReceiver, amqpConnection);
         _protonReceiver = protonReceiver;
     }
 
@@ -74,10 +74,12 @@ public class AmqpReceiver extends AmqpLi
                                 break;
                             }
                         }
+
                         Message message = getAmqpConnection().getMessageFactory().createMessage();
                         message.decode(_buffer, 0, total);
 
-                        AmqpMessage amqpMessage = new AmqpMessage(currentDelivery, message, this);
+                        //TODO: dont create a new factory for every message
+                        AmqpMessage amqpMessage = new AmqpMessageFactory().createAmqpMessage(currentDelivery, message, getAmqpConnection());
                         currentDelivery.setContext(amqpMessage);
                         _protonReceiver.advance();
                         return amqpMessage;

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSender.java Wed Nov 20 14:34:55 2013
@@ -32,9 +32,9 @@ public class AmqpSender extends AmqpLink
     private byte[] _buffer = new byte[1024];
     private final Sender _protonSender;
 
-    public AmqpSender(AmqpSession amqpSession, Sender protonSender)
+    public AmqpSender(AmqpSession amqpSession, Sender protonSender, AmqpConnection amqpConnection)
     {
-        super(amqpSession, protonSender);
+        super(amqpSession, protonSender, amqpConnection);
         _protonSender = protonSender;
     }
 
@@ -46,7 +46,7 @@ public class AmqpSender extends AmqpLink
             byte[] bufferBytes = new byte[8];
             ByteBuffer buffer = ByteBuffer.wrap(bufferBytes);
 
-            buffer.putLong(0,tag++);
+            buffer.putLong(0, tag++);
 
             Delivery del = _protonSender.delivery(bufferBytes);
 
@@ -66,10 +66,10 @@ public class AmqpSender extends AmqpLink
             _protonSender.send(_buffer, 0, encoded);
             _protonSender.advance();
 
-            AmqpSentMessageToken amqpSentMessage = new AmqpSentMessageToken(del, this);
-            del.setContext(amqpSentMessage);
+            AmqpSentMessageToken amqpSentMessageToken = new AmqpSentMessageToken(del, this);
+            del.setContext(amqpSentMessageToken);
 
-            return amqpSentMessage;
+            return amqpSentMessageToken;
         }
     }
 

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpSession.java Wed Nov 20 14:34:55 2013
@@ -39,9 +39,9 @@ public class AmqpSession
 
     private boolean _closed;
 
-    public AmqpSession(AmqpConnection amqpConn, Session protonSession)
+    public AmqpSession(AmqpConnection amqpConnection, Session protonSession)
     {
-        _amqpConnection = amqpConn;
+        _amqpConnection = amqpConnection;
         _protonSession = protonSession;
     }
 
@@ -90,7 +90,7 @@ public class AmqpSession
         protonSender.setSenderSettleMode(SenderSettleMode.UNSETTLED);
         protonSender.setReceiverSettleMode(ReceiverSettleMode.FIRST);
 
-        AmqpSender amqpSender = new AmqpSender(this, protonSender);
+        AmqpSender amqpSender = new AmqpSender(this, protonSender, _amqpConnection);
         protonSender.setContext(amqpSender);
         protonSender.open();
         _amqpConnection.addPendingLink(protonSender);
@@ -113,7 +113,7 @@ public class AmqpSession
         protonReceiver.setSenderSettleMode(SenderSettleMode.UNSETTLED);
         protonReceiver.setReceiverSettleMode(ReceiverSettleMode.FIRST);
 
-        AmqpReceiver amqpReceiver = new AmqpReceiver(this, protonReceiver);
+        AmqpReceiver amqpReceiver = new AmqpReceiver(this, protonReceiver, _amqpConnection);
         protonReceiver.setContext(amqpReceiver);
         protonReceiver.open();
         _amqpConnection.addPendingLink(protonReceiver);

Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/engine/AmqpTextMessage.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.engine;
+
+import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.messaging.AmqpValue;
+import org.apache.qpid.proton.amqp.messaging.Data;
+import org.apache.qpid.proton.amqp.messaging.Section;
+import org.apache.qpid.proton.engine.Delivery;
+import org.apache.qpid.proton.message.Message;
+
+public class AmqpTextMessage extends AmqpMessage
+{
+    /**
+     * Content type, only to be used when message uses a data
+     * body section, and not when using an amqp-value body section
+     */
+    public static final String CONTENT_TYPE = "text/plain";
+
+    /**
+     * Message type annotation value, only to be used when message uses
+     * an amqp-value body section containing a null value, not otherwise.
+     */
+    public static final String MSG_TYPE_ANNOTATION_VALUE = "TextMessage";
+
+    public AmqpTextMessage()
+    {
+        super();
+        setText(null);
+    }
+
+    public AmqpTextMessage(Delivery delivery, Message message, AmqpConnection amqpConnection)
+    {
+        super(message, delivery, amqpConnection);
+    }
+
+    public void setText(String text)
+    {
+        AmqpValue body = new AmqpValue(text);
+        getMessage().setBody(body);
+
+        Symbol msgTypeAnnotationKey = Symbol.valueOf(MESSAGE_ANNOTATION_TYPE_KEY_NAME);
+        if(text == null && !messageAnnotationExists(msgTypeAnnotationKey))
+        {
+            setMessageAnnotation(msgTypeAnnotationKey, MSG_TYPE_ANNOTATION_VALUE);
+        }
+        else if(text != null && messageAnnotationExists(msgTypeAnnotationKey))
+        {
+            clearMessageAnnotation(msgTypeAnnotationKey);
+        }
+    }
+
+    public String getText()
+    {
+        Section body = getMessage().getBody();
+
+        if(body == null)
+        {
+            return null;
+        }
+        else if(body instanceof Data)
+        {
+            //TODO
+            return null;
+        }
+        else if(body instanceof AmqpValue)
+        {
+            Object value = ((AmqpValue) body).getValue();
+
+            if(value == null || value instanceof String)
+            {
+                return (String) value;
+            }
+            else
+            {
+                throw new RuntimeException("Unexpected body content type: " + value.getClass().getSimpleName());
+            }
+        }
+        else
+        {
+            throw new RuntimeException("Unexpected message body type: " + body.getClass().getSimpleName());
+        }
+    }
+}

Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/BytesMessageImpl.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,308 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.impl;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.EOFException;
+import java.io.IOException;
+
+import javax.jms.BytesMessage;
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+
+import org.apache.qpid.jms.engine.AmqpBytesMessage;
+
+public class BytesMessageImpl extends MessageImpl<AmqpBytesMessage> implements BytesMessage
+{
+    private ByteArrayOutputStream _bytesOut;
+    private DataOutputStream _dataAsOutput;
+    private DataInputStream _dataIn;
+    private ByteArrayInputStream _bytesIn;
+
+    //message to be sent
+    public BytesMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    {
+        this(new AmqpBytesMessage(), sessionImpl, connectionImpl);
+        _bytesOut = new ByteArrayOutputStream();
+        _dataAsOutput = new DataOutputStream(_bytesOut);
+    }
+
+    //message just received
+    public BytesMessageImpl(AmqpBytesMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    {
+        super(amqpMessage, sessionImpl, connectionImpl);
+        _dataIn = new DataInputStream(amqpMessage.getByteArrayInputStream());
+    }
+
+    @Override
+    protected AmqpBytesMessage prepareUnderlyingAmqpMessageForSending(AmqpBytesMessage amqpMessage)
+    {
+        amqpMessage.setBytes(_bytesOut.toByteArray());
+
+        //TODO: do we need to do anything later with properties/headers etc?
+        return amqpMessage;
+    }
+
+    private JMSException handleInputException(final IOException e)
+    {
+        JMSException ex;
+        if(e instanceof EOFException)
+        {
+            ex = new MessageEOFException(e.getMessage());
+        }
+        else
+        {
+            ex = new MessageFormatException(e.getMessage());
+        }
+        ex.initCause(e);
+        ex.setLinkedException(e);
+        return ex;
+    }
+
+    private JMSException handleOutputException(final IOException e)
+    {
+        return new QpidJmsException(e.getMessage(), e);
+    }
+
+    //======= JMS Methods =======
+
+    @Override
+    public long getBodyLength() throws JMSException
+    {
+        return getUnderlyingAmqpMessage(false).getBytesLength();
+    }
+
+    @Override
+    public boolean readBoolean() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public byte readByte() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public int readUnsignedByte() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public short readShort() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public int readUnsignedShort() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public char readChar() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public int readInt() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public long readLong() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public float readFloat() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public double readDouble() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public String readUTF() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public int readBytes(byte[] value) throws JMSException
+    {
+        return readBytes(value, value.length);
+    }
+
+    @Override
+    public int readBytes(byte[] value, int length) throws JMSException
+    {
+        //TODO: checkReadable();
+
+        try
+        {
+            int offset = 0;
+            while(offset < length)
+            {
+                int read = _dataIn.read(value, offset, length - offset);
+                if(read < 0)
+                {
+                    break;
+                }
+                offset += read;
+            }
+
+            if(offset == 0 && length != 0)
+            {
+                return -1;
+            }
+            else
+            {
+                return offset;
+            }
+        }
+        catch (IOException e)
+        {
+            throw handleInputException(e);
+        }
+    }
+
+    @Override
+    public void writeBoolean(boolean value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeByte(byte value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeShort(short value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeChar(char value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeInt(int value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeLong(long value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeFloat(float value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeDouble(double value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeUTF(String value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeBytes(byte[] bytes) throws JMSException
+    {
+        //TODO: checkWritable();
+        try
+        {
+            _dataAsOutput.write(bytes);
+        }
+        catch (IOException e)
+        {
+            throw handleOutputException(e);
+        }
+    }
+
+    @Override
+    public void writeBytes(byte[] value, int offset, int length) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeObject(Object value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void reset() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+}

Copied: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java (from r1529205, qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java)
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java?p2=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java&p1=qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java&r1=1529205&r2=1543828&rev=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/GenericAmqpMessageImpl.java Wed Nov 20 14:34:55 2013
@@ -19,32 +19,27 @@
 package org.apache.qpid.jms.impl;
 
 import javax.jms.JMSException;
-import javax.jms.TextMessage;
+import javax.jms.Message;
 
-public class TextMessageImpl extends MessageImpl implements TextMessage
+import org.apache.qpid.jms.engine.AmqpGenericMessage;
+
+public class GenericAmqpMessageImpl extends MessageImpl<AmqpGenericMessage> implements Message
 {
-    public TextMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    public GenericAmqpMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
     {
-        super(sessionImpl, connectionImpl);
+        this(new AmqpGenericMessage(), sessionImpl, connectionImpl);
     }
 
-    public TextMessageImpl(String text, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    public GenericAmqpMessageImpl(AmqpGenericMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
     {
-        this(sessionImpl, connectionImpl);
-        setText(text);
+        super(amqpMessage, sessionImpl, connectionImpl);
     }
 
     @Override
-    public String getText() throws JMSException
+    protected AmqpGenericMessage prepareUnderlyingAmqpMessageForSending(AmqpGenericMessage amqpMessage)
     {
-        // TODO Auto-generated method stub
+        //TODO
         throw new UnsupportedOperationException("Not Implemented");
     }
 
-    @Override
-    public void setText(String string) throws JMSException
-    {
-        getAmqpMessage().setText(string);
-    }
-
 }

Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MapMessageImpl.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.impl;
+
+import java.util.Enumeration;
+
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+
+import org.apache.qpid.jms.engine.AmqpMapMessage;
+
+public class MapMessageImpl extends MessageImpl<AmqpMapMessage> implements MapMessage
+{
+    public MapMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    {
+        this(new AmqpMapMessage(), sessionImpl, connectionImpl);
+    }
+
+    public MapMessageImpl(AmqpMapMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    {
+        super(amqpMessage, sessionImpl, connectionImpl);
+    }
+
+    @Override
+    protected AmqpMapMessage prepareUnderlyingAmqpMessageForSending(AmqpMapMessage amqpMessage)
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+     //======= JMS Methods =======
+
+    @Override
+    public boolean getBoolean(String name) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public byte getByte(String name) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public short getShort(String name) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public char getChar(String name) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public int getInt(String name) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public long getLong(String name) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public float getFloat(String name) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public double getDouble(String name) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public String getString(String name) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public byte[] getBytes(String name) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public Object getObject(String name) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @SuppressWarnings("rawtypes")
+    @Override
+    public Enumeration getMapNames() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void setBoolean(String name, boolean value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void setByte(String name, byte value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void setShort(String name, short value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void setChar(String name, char value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void setInt(String name, int value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void setLong(String name, long value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void setFloat(String name, float value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void setDouble(String name, double value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void setString(String name, String value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void setBytes(String name, byte[] value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void setBytes(String name, byte[] value, int offset, int length)
+            throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void setObject(String name, Object value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public boolean itemExists(String name) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+}

Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageFactoryImpl.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,68 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.impl;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+
+import org.apache.qpid.jms.engine.AmqpBytesMessage;
+import org.apache.qpid.jms.engine.AmqpGenericMessage;
+import org.apache.qpid.jms.engine.AmqpListMessage;
+import org.apache.qpid.jms.engine.AmqpMapMessage;
+import org.apache.qpid.jms.engine.AmqpMessage;
+import org.apache.qpid.jms.engine.AmqpObjectMessage;
+import org.apache.qpid.jms.engine.AmqpTextMessage;
+
+public class MessageFactoryImpl
+{
+    Message createJmsMessage(AmqpMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    {
+        if(amqpMessage instanceof AmqpTextMessage)
+        {
+            return new TextMessageImpl((AmqpTextMessage) amqpMessage, sessionImpl, connectionImpl);
+        }
+        else if(amqpMessage instanceof AmqpBytesMessage)
+        {
+            return new BytesMessageImpl((AmqpBytesMessage) amqpMessage, sessionImpl, connectionImpl);
+        }
+        else if(amqpMessage instanceof AmqpObjectMessage)
+        {
+            return new ObjectMessageImpl((AmqpObjectMessage) amqpMessage, sessionImpl, connectionImpl);
+        }
+        else if(amqpMessage instanceof AmqpListMessage)
+        {
+            return new StreamMessageImpl((AmqpListMessage) amqpMessage, sessionImpl, connectionImpl);
+        }
+        else if(amqpMessage instanceof AmqpMapMessage)
+        {
+            return new MapMessageImpl((AmqpMapMessage) amqpMessage, sessionImpl, connectionImpl);
+        }
+        else if(amqpMessage instanceof AmqpGenericMessage)
+        {
+            return new GenericAmqpMessageImpl((AmqpGenericMessage) amqpMessage, sessionImpl, connectionImpl);
+        }
+        else
+        {
+            //TODO: support other message types
+            throw new QpidJmsException("Unknown Message Type");
+        }
+    }
+}

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/MessageImpl.java Wed Nov 20 14:34:55 2013
@@ -26,29 +26,31 @@ import javax.jms.Message;
 
 import org.apache.qpid.jms.engine.AmqpMessage;
 
-public class MessageImpl implements Message
+public abstract class MessageImpl<T extends AmqpMessage> implements Message
 {
-    private final AmqpMessage _amqpMessage;
-    private final SessionImpl _sessionImpl;
-    private final ConnectionImpl _connectionImpl;
+    private final T _amqpMessage;
 
-    public MessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl)
-    {
-        this(new AmqpMessage(), sessionImpl, connectionImpl);
-    }
-
-    public MessageImpl(AmqpMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl)
+    public MessageImpl(T amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl)
     {
         _amqpMessage = amqpMessage;
-        _sessionImpl = sessionImpl;
-        _connectionImpl = connectionImpl;
     }
 
-    AmqpMessage getAmqpMessage()
+    T getUnderlyingAmqpMessage(boolean prepareForSending)
     {
-        return _amqpMessage;
+        if(prepareForSending)
+        {
+            return prepareUnderlyingAmqpMessageForSending(_amqpMessage);
+        }
+        else
+        {
+            return _amqpMessage;
+        }
     }
 
+    protected abstract T prepareUnderlyingAmqpMessageForSending(T amqpMessage);
+
+    //======= JMS Methods =======
+
     @Override
     public String getJMSMessageID() throws JMSException
     {
@@ -363,5 +365,4 @@ public class MessageImpl implements Mess
         // TODO Auto-generated method stub
         throw new UnsupportedOperationException("Not Implemented");
     }
-
 }

Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ObjectMessageImpl.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.impl;
+
+import java.io.Serializable;
+
+import javax.jms.JMSException;
+import javax.jms.ObjectMessage;
+
+import org.apache.qpid.jms.engine.AmqpObjectMessage;
+
+public class ObjectMessageImpl extends MessageImpl<AmqpObjectMessage> implements ObjectMessage
+{
+    public ObjectMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    {
+        this(new AmqpObjectMessage(), sessionImpl, connectionImpl);
+    }
+
+    public ObjectMessageImpl(AmqpObjectMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    {
+        super(amqpMessage, sessionImpl, connectionImpl);
+    }
+
+    @Override
+    protected AmqpObjectMessage prepareUnderlyingAmqpMessageForSending(AmqpObjectMessage amqpMessage)
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    //======= JMS Methods =======
+
+    @Override
+    public void setObject(Serializable object) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public Serializable getObject() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+}

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/ReceiverImpl.java Wed Nov 20 14:34:55 2013
@@ -61,11 +61,10 @@ public class ReceiverImpl extends LinkIm
 
             MessageReceivedPredicate messageReceievedCondition = new MessageReceivedPredicate();
             getConnectionImpl().waitUntil(messageReceievedCondition, timeout);
-
-            //TODO: decide what if any particular message impl class to instantiate
-
             AmqpMessage receivedAmqpMessage = messageReceievedCondition.getReceivedMessage();
-            MessageImpl receivedMessageImpl = new MessageImpl(receivedAmqpMessage, _sessionImpl, getConnectionImpl());
+
+            //TODO: don't create a new factory for every message
+            Message receivedMessage = new MessageFactoryImpl().createJmsMessage(receivedAmqpMessage, _sessionImpl, getConnectionImpl());
 
             //TODO: accepting/settling will be acknowledge-mode dependent
             if(_sessionImpl.getAcknowledgeMode() == Session.AUTO_ACKNOWLEDGE)
@@ -79,7 +78,7 @@ public class ReceiverImpl extends LinkIm
 
             getConnectionImpl().stateChanged();
 
-            return receivedMessageImpl;
+            return receivedMessage;
         }
         catch (JmsTimeoutException e)
         {

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SenderImpl.java Wed Nov 20 14:34:55 2013
@@ -66,10 +66,11 @@ public class SenderImpl extends LinkImpl
     {
         if(message instanceof MessageImpl)
         {
-            return ((MessageImpl)message).getAmqpMessage();
+            return ((MessageImpl<?>)message).getUnderlyingAmqpMessage(true);
         }
         else
         {
+            //TODO
             throw new UnsupportedOperationException("cross-vendor message support has yet to be implemented");
         }
     }

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Wed Nov 20 14:34:55 2013
@@ -203,8 +203,7 @@ public class SessionImpl implements Sess
     @Override
     public BytesMessage createBytesMessage() throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        return new BytesMessageImpl(this, getConnectionImpl());
     }
 
     @Override

Added: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java?rev=1543828&view=auto
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java (added)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java Wed Nov 20 14:34:55 2013
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms.impl;
+
+import javax.jms.JMSException;
+import javax.jms.StreamMessage;
+
+import org.apache.qpid.jms.engine.AmqpListMessage;
+
+public class StreamMessageImpl extends MessageImpl<AmqpListMessage> implements StreamMessage
+{
+    public StreamMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    {
+        this(new AmqpListMessage(), sessionImpl, connectionImpl);
+    }
+
+    public StreamMessageImpl(AmqpListMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    {
+        super(amqpMessage, sessionImpl, connectionImpl);
+    }
+
+    @Override
+    protected AmqpListMessage prepareUnderlyingAmqpMessageForSending(AmqpListMessage amqpMessage)
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    //======= JMS Methods =======
+
+    @Override
+    public boolean readBoolean() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public byte readByte() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public short readShort() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public char readChar() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public int readInt() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public long readLong() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public float readFloat() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public double readDouble() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public String readString() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public int readBytes(byte[] value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public Object readObject() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeBoolean(boolean value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeByte(byte value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeShort(short value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeChar(char value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeInt(int value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeLong(long value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeFloat(float value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeDouble(double value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeString(String value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeBytes(byte[] value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeBytes(byte[] value, int offset, int length)
+            throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void writeObject(Object value) throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+
+    @Override
+    public void reset() throws JMSException
+    {
+        //TODO
+        throw new UnsupportedOperationException("Not Implemented");
+    }
+}

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/TextMessageImpl.java Wed Nov 20 14:34:55 2013
@@ -21,30 +21,48 @@ package org.apache.qpid.jms.impl;
 import javax.jms.JMSException;
 import javax.jms.TextMessage;
 
-public class TextMessageImpl extends MessageImpl implements TextMessage
+import org.apache.qpid.jms.engine.AmqpTextMessage;
+
+public class TextMessageImpl extends MessageImpl<AmqpTextMessage> implements TextMessage
 {
     public TextMessageImpl(SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
     {
-        super(sessionImpl, connectionImpl);
+        this((String) null, sessionImpl, connectionImpl);
     }
 
     public TextMessageImpl(String text, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
     {
-        this(sessionImpl, connectionImpl);
+        this(new AmqpTextMessage(), sessionImpl, connectionImpl);
         setText(text);
     }
 
+    public TextMessageImpl(AmqpTextMessage amqpMessage, SessionImpl sessionImpl, ConnectionImpl connectionImpl) throws JMSException
+    {
+        super(amqpMessage, sessionImpl, connectionImpl);
+    }
+
     @Override
-    public String getText() throws JMSException
+    protected AmqpTextMessage prepareUnderlyingAmqpMessageForSending(AmqpTextMessage amqpMessage)
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        //Nothing to do here currently, the message operations are all
+        //already operating on the AmqpMessage directly
+
+        //TODO: do we need to do anything later with properties/headers etc?
+        return amqpMessage;
     }
 
+    //======= JMS Methods =======
+
     @Override
-    public void setText(String string) throws JMSException
+    public String getText() throws JMSException
     {
-        getAmqpMessage().setText(string);
+        return getUnderlyingAmqpMessage(false).getText();
     }
 
+    @Override
+    public void setText(String text) throws JMSException
+    {
+        //TODO: checkWritable();
+        getUnderlyingAmqpMessage(false).setText(text);
+    }
 }

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/IntegrationTestFixture.java Wed Nov 20 14:34:55 2013
@@ -36,7 +36,7 @@ public class IntegrationTestFixture
 
         Connection connection =  new ConnectionImpl("clientName", "localhost", PORT, "guest", "guest");
 
-        assertNull(testPeer.getException());
+        assertNull(testPeer.getThrowable());
         return connection;
     }
 }

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java?rev=1543828&r1=1543827&r2=1543828&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/SessionIntegrationTest.java Wed Nov 20 14:34:55 2013
@@ -18,17 +18,39 @@
  */
 package org.apache.qpid.jms;
 
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
+import java.util.Arrays;
+
+import javax.jms.BytesMessage;
 import javax.jms.Connection;
 import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
 import javax.jms.Session;
+import javax.jms.TextMessage;
 
-import org.apache.qpid.jms.impl.MessageImpl;
+import org.apache.qpid.jms.engine.AmqpBytesMessage;
+import org.apache.qpid.jms.engine.AmqpMessage;
+import org.apache.qpid.jms.engine.AmqpTextMessage;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.DataDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.PropertiesDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedDataMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.junit.Test;
 
 public class SessionIntegrationTest extends QpidJmsTestCase
@@ -73,7 +95,63 @@ public class SessionIntegrationTest exte
     }
 
     @Test
-    public void testSendReceiveTextMessageWithContent() throws Exception
+    public void testReceiveTextMessageWithContent() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            final String expectedMessageContent = "myTextMessage";
+
+            DescribedType amqpValueStringContent = new AmqpValueDescribedType(expectedMessageContent);
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, null, amqpValueStringContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof TextMessage);
+            assertEquals(expectedMessageContent,((TextMessage)receivedMessage).getText());
+        }
+    }
+
+    @Test
+    public void testSendTextMessageWithoutContent() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true)
+                                    .withEntry(Symbol.valueOf(AmqpMessage.MESSAGE_ANNOTATION_TYPE_KEY_NAME), equalTo(AmqpTextMessage.MSG_TYPE_ANNOTATION_VALUE));
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(null));
+            testPeer.expectTransfer(messageMatcher);
+
+            Message message = session.createTextMessage();
+
+            producer.send(message);
+        }
+    }
+
+    @Test
+    public void testReceiveTextMessageWithoutContent() throws Exception
     {
         try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
         {
@@ -85,17 +163,89 @@ public class SessionIntegrationTest exte
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Queue queue = session.createQueue("myQueue");
 
+            MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+            msgAnnotations.setSymbolKeyedAnnotation(AmqpMessage.MESSAGE_ANNOTATION_TYPE_KEY_NAME,  AmqpTextMessage.MSG_TYPE_ANNOTATION_VALUE);
+            DescribedType amqpValueNullContent = new AmqpValueDescribedType(null);
+
             testPeer.expectReceiverAttach();
-            testPeer.expectLinkFlowRespondWithTransfer();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, amqpValueNullContent);
             testPeer.expectDispositionThatIsAcceptedAndSettled();
 
             MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
 
-            // TODO check that it's a TextMessage with expected content: String expectedText = "myMessage";
-            MessageImpl receivedMessage = (MessageImpl) messageConsumer.receive(1000);
             assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof TextMessage);
+            assertNull(((TextMessage)receivedMessage).getText());
+        }
+    }
+
+    @Test
+    public void testSendBytesMessageWithContent() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+
+            byte[] content = "myBytes".getBytes();
+
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            MessagePropertiesSectionMatcher propertiesMatcher = new MessagePropertiesSectionMatcher(true);
+            propertiesMatcher.withContentType(equalTo(Symbol.valueOf(AmqpBytesMessage.CONTENT_TYPE)));
+            messageMatcher.setPropertiesMatcher(propertiesMatcher);
+            messageMatcher.setMessageContentMatcher(new EncodedDataMatcher(new Binary(content)));
+
+            testPeer.expectTransfer(messageMatcher);
+
+            BytesMessage message = session.createBytesMessage();
+            message.writeBytes(content);
+
+            producer.send(message);
+        }
+    }
 
-            testPeer.waitForAllHandlersToComplete();
+    @Test
+    public void testReceiveBytesMessageWithContentUsingDataSection() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            PropertiesDescribedType properties = new PropertiesDescribedType();
+            properties.setContentType(Symbol.valueOf(AmqpBytesMessage.CONTENT_TYPE));
+
+            final byte[] expectedContent = "expectedContent".getBytes();
+            DescribedType dataContent = new DataDescribedType(new Binary(expectedContent));
+
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, null, properties, dataContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            assertNotNull(receivedMessage);
+            assertTrue(receivedMessage instanceof BytesMessage);
+            BytesMessage bytesMessage = (BytesMessage)receivedMessage;
+            assertEquals(expectedContent.length, bytesMessage.getBodyLength());
+            byte[] recievedContent = new byte[expectedContent.length];
+            int readBytes = bytesMessage.readBytes(recievedContent);
+            assertEquals(recievedContent.length, readBytes);
+            assertTrue(Arrays.equals(expectedContent, recievedContent));
         }
     }
 }



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org