You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ro...@apache.org on 2014/07/24 18:40:27 UTC

svn commit: r1613183 - in /qpid/jms/trunk/src: main/java/org/apache/qpid/jms/impl/ test/java/org/apache/qpid/jms/ test/java/org/apache/qpid/jms/impl/

Author: robbie
Date: Thu Jul 24 16:40:27 2014
New Revision: 1613183

URL: http://svn.apache.org/r1613183
Log:
QPIDJMS-22: round out basic StreamMessage support

Added:
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/StreamMessageIntegrationTest.java
Modified:
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
    qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
    qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java?rev=1613183&r1=1613182&r2=1613183&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/SessionImpl.java Thu Jul 24 16:40:27 2014
@@ -254,8 +254,7 @@ public class SessionImpl implements Sess
     @Override
     public StreamMessage createStreamMessage() throws JMSException
     {
-        // TODO Auto-generated method stub
-        throw new UnsupportedOperationException("Not Implemented");
+        return new StreamMessageImpl(this, getConnectionImpl());
     }
 
     @Override

Modified: qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java?rev=1613183&r1=1613182&r2=1613183&view=diff
==============================================================================
--- qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java (original)
+++ qpid/jms/trunk/src/main/java/org/apache/qpid/jms/impl/StreamMessageImpl.java Thu Jul 24 16:40:27 2014
@@ -53,8 +53,8 @@ public class StreamMessageImpl extends M
     @Override
     protected AmqpListMessage prepareUnderlyingAmqpMessageForSending(AmqpListMessage amqpMessage)
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        //Currently nothing to do, we always operate directly on the underlying AmqpListMessage.
+        return amqpMessage;
     }
 
     private void checkObjectType(Object value) throws QpidJmsMessageFormatException
@@ -73,13 +73,15 @@ public class StreamMessageImpl extends M
         }
     }
 
-    private Object readObjectInternal(boolean checkExistingReadBytesUsage) throws MessageEOFException, QpidJmsMessageFormatException
+    private Object readObjectInternal(boolean checkExistingReadBytesUsage) throws MessageEOFException, QpidJmsMessageFormatException, MessageNotReadableException
     {
+        checkBodyReadable();
+
         if(checkExistingReadBytesUsage)
         {
             if(_remainingBytes != NO_BYTES_IN_FLIGHT)
             {
-                throw new QpidJmsMessageFormatException("Partially read bytes entry still being retrieved using readBytes(byte[] dest)");
+                throw new QpidJmsMessageFormatException("Partially read byte[] entry still being retrieved using readBytes(byte[] dest)");
             }
         }
 
@@ -93,6 +95,11 @@ public class StreamMessageImpl extends M
         }
     }
 
+    private void decrementStreamPosition()
+    {
+        getUnderlyingAmqpMessage(false).decrementPosition();
+    }
+
     //======= JMS Methods =======
 
     @Override
@@ -109,6 +116,7 @@ public class StreamMessageImpl extends M
         }
         else
         {
+            decrementStreamPosition();
             throw new QpidJmsMessageFormatException("Stream entry of type " + o.getClass().getName() + " cannot be converted to boolean.");
         }
     }
@@ -116,57 +124,230 @@ public class StreamMessageImpl extends M
     @Override
     public byte readByte() throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        Object o = readObject();
+        if(o instanceof Byte)
+        {
+            return (Byte) o;
+        }
+        else if(o instanceof String || o == null)
+        {
+            try
+            {
+                return Byte.valueOf((String)o);
+            }
+            catch(RuntimeException e)
+            {
+                decrementStreamPosition();
+                throw e;
+            }
+        }
+        else
+        {
+            decrementStreamPosition();
+            throw new QpidJmsMessageFormatException("Stream entry of type " + o.getClass().getName() + " cannot be converted to byte.");
+        }
     }
 
     @Override
     public short readShort() throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        Object o = readObject();
+        if(o instanceof Short)
+        {
+            return (Short) o;
+        }
+        else if(o instanceof Byte)
+        {
+            return (Byte) o;
+        }
+        else if(o instanceof String || o == null)
+        {
+            try
+            {
+                return Short.valueOf((String)o);
+            }
+            catch(RuntimeException e)
+            {
+                decrementStreamPosition();
+                throw e;
+            }
+        }
+        else
+        {
+            decrementStreamPosition();
+            throw new QpidJmsMessageFormatException("Stream entry of type " + o.getClass().getName() + " cannot be converted to short.");
+        }
     }
 
     @Override
     public char readChar() throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        Object o = readObject();
+        if(o instanceof Character)
+        {
+            return (Character) o;
+        }
+        else if(o == null)
+        {
+            decrementStreamPosition();
+            throw new NullPointerException("Stream entry with null value cannot be converted to char.");
+        }
+        else
+        {
+            decrementStreamPosition();
+            throw new QpidJmsMessageFormatException("Stream entry of type " + o.getClass().getName() + " cannot be converted to char.");
+        }
     }
 
     @Override
     public int readInt() throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        Object o = readObject();
+        if(o instanceof Integer)
+        {
+            return (Integer) o;
+        }
+        else if(o instanceof Short)
+        {
+            return (Short) o;
+        }
+        else if(o instanceof Byte)
+        {
+            return (Byte) o;
+        }
+        else if(o instanceof String || o == null)
+        {
+            try
+            {
+                return Integer.valueOf((String)o);
+            }
+            catch(RuntimeException e)
+            {
+                decrementStreamPosition();
+                throw e;
+            }
+        }
+        else
+        {
+            decrementStreamPosition();
+            throw new QpidJmsMessageFormatException("Stream entry of type " + o.getClass().getName() + " cannot be converted to int.");
+        }
     }
 
     @Override
     public long readLong() throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        Object o = readObject();
+        if(o instanceof Long)
+        {
+            return (Long) o;
+        }
+        else if(o instanceof Integer)
+        {
+            return (Integer) o;
+        }
+        else if(o instanceof Short)
+        {
+            return (Short) o;
+        }
+        else if(o instanceof Byte)
+        {
+            return (Byte) o;
+        }
+        else if(o instanceof String || o == null)
+        {
+            try
+            {
+                return Long.valueOf((String)o);
+            }
+            catch(RuntimeException e)
+            {
+                decrementStreamPosition();
+                throw e;
+            }
+        }
+        else
+        {
+            decrementStreamPosition();
+            throw new QpidJmsMessageFormatException("Stream entry of type " + o.getClass().getName() + " cannot be converted to long.");
+        }
     }
 
     @Override
     public float readFloat() throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        Object o = readObject();
+        if(o instanceof Float)
+        {
+            return (Float) o;
+        }
+        else if(o instanceof String || o == null)
+        {
+            try
+            {
+                return Float.valueOf((String)o);
+            }
+            catch(RuntimeException e)
+            {
+                decrementStreamPosition();
+                throw e;
+            }
+        }
+        else
+        {
+            decrementStreamPosition();
+            throw new QpidJmsMessageFormatException("Stream entry of type " + o.getClass().getName() + " cannot be converted to float.");
+        }
     }
 
     @Override
     public double readDouble() throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        Object o = readObject();
+        if(o instanceof Float)
+        {
+            return (Float) o;
+        }
+        else if(o instanceof Double)
+        {
+            return (Double) o;
+        }
+        else if(o instanceof String || o == null)
+        {
+            try
+            {
+                return Double.valueOf((String)o);
+            }
+            catch(RuntimeException e)
+            {
+                decrementStreamPosition();
+                throw e;
+            }
+        }
+        else
+        {
+            decrementStreamPosition();
+            throw new QpidJmsMessageFormatException("Stream entry of type " + o.getClass().getName() + " cannot be converted to double.");
+        }
     }
 
     @Override
     public String readString() throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        Object o = readObject();
+        if(o instanceof String || o == null)
+        {
+            return (String) o;
+        }
+        else if(o instanceof byte[])
+        {
+            decrementStreamPosition();
+            throw new QpidJmsMessageFormatException("Stream entry of type " + o.getClass().getName() + " cannot be converted to String.");
+        }
+        else
+        {
+            return String.valueOf(o);
+        }
     }
 
     @Override
@@ -222,15 +403,14 @@ public class StreamMessageImpl extends M
             }
             else
             {
-                //More work to do to complete reading this field, move the position back.
-                getUnderlyingAmqpMessage(false).decrementPosition();
+                decrementStreamPosition();
             }
 
             return lengthToCopy;
         }
         else
         {
-            getUnderlyingAmqpMessage(false).decrementPosition();
+            decrementStreamPosition();
             throw new QpidJmsMessageFormatException("Stream entry of type " + o.getClass().getName() + " cannot be converted to bytes.");
         }
     }
@@ -250,57 +430,49 @@ public class StreamMessageImpl extends M
     @Override
     public void writeByte(byte value) throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        writeObject(value);
     }
 
     @Override
     public void writeShort(short value) throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        writeObject(value);
     }
 
     @Override
     public void writeChar(char value) throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        writeObject(value);
     }
 
     @Override
     public void writeInt(int value) throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        writeObject(value);
     }
 
     @Override
     public void writeLong(long value) throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        writeObject(value);
     }
 
     @Override
     public void writeFloat(float value) throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        writeObject(value);
     }
 
     @Override
     public void writeDouble(double value) throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        writeObject(value);
     }
 
     @Override
     public void writeString(String value) throws JMSException
     {
-        //TODO
-        throw new UnsupportedOperationException("Not Implemented");
+        writeObject(value);
     }
 
     @Override

Added: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/StreamMessageIntegrationTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/StreamMessageIntegrationTest.java?rev=1613183&view=auto
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/StreamMessageIntegrationTest.java (added)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/StreamMessageIntegrationTest.java Thu Jul 24 16:40:27 2014
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.Connection;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+
+import org.apache.qpid.jms.impl.ClientProperties;
+import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType;
+import org.apache.qpid.jms.test.testpeer.describedtypes.sections.MessageAnnotationsDescribedType;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSectionMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
+import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
+import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.DescribedType;
+import org.apache.qpid.proton.amqp.Symbol;
+import org.junit.Test;
+
+public class StreamMessageIntegrationTest extends QpidJmsTestCase
+{
+    private final IntegrationTestFixture _testFixture = new IntegrationTestFixture();
+
+    /**
+     * Test that a message received from the test peer with an AmqpValue section containing
+     * a list which holds entries of the various supported entry types is returned as a
+     * {@link MapMessage}, and verify the values can all be retrieved as expected.
+     */
+    @Test
+    public void testReceiveBasicMapMessage() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            connection.start();
+
+            testPeer.expectBegin(true);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+
+            //Prepare an AMQP message for the test peer to send, containing an
+            //AmqpValue section holding a list with entries for each supported type,
+            //and annotated as a JMS stream message.
+            boolean myBool = true;
+            byte myByte = Byte.MAX_VALUE;
+            char myChar = 'c';
+            double myDouble = 1234567890123456789.1234;
+            float myFloat = 1.1F;
+            int myInt = Integer.MAX_VALUE;
+            long myLong = Long.MAX_VALUE;
+            short myShort = Short.MAX_VALUE;
+            String myString = "myString";
+            byte[] myBytes = "myBytes".getBytes();
+
+            List<Object> list = new ArrayList<Object>();
+            list.add(myBool);
+            list.add(myByte);
+            list.add(new Binary(myBytes));//the underlying AMQP message uses Binary rather than byte[] directly.
+            list.add(myChar);
+            list.add(myDouble);
+            list.add(myFloat);
+            list.add(myInt);
+            list.add(myLong);
+            list.add(myShort);
+            list.add(myString);
+
+            MessageAnnotationsDescribedType msgAnnotations = new MessageAnnotationsDescribedType();
+            msgAnnotations.setSymbolKeyedAnnotation(ClientProperties.X_OPT_JMS_MSG_TYPE, ClientProperties.STREAM_MESSAGE_TYPE);
+
+            DescribedType amqpValueSectionContent = new AmqpValueDescribedType(list);
+
+            //receive the message from the test peer
+            testPeer.expectReceiverAttach();
+            testPeer.expectLinkFlowRespondWithTransfer(null, msgAnnotations, null, null, amqpValueSectionContent);
+            testPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            MessageConsumer messageConsumer = session.createConsumer(queue);
+            Message receivedMessage = messageConsumer.receive(1000);
+            testPeer.waitForAllHandlersToComplete(3000);
+
+            //verify the content is as expected
+            assertNotNull("Message was not received", receivedMessage);
+            assertTrue("Message was not a MapMessage", receivedMessage instanceof StreamMessage);
+            StreamMessage receivedStreamMessage  = (StreamMessage) receivedMessage;
+
+            assertEquals("Unexpected boolean value", myBool, receivedStreamMessage.readBoolean());
+            assertEquals("Unexpected byte value", myByte, receivedStreamMessage.readByte());
+            byte[] readBytes = (byte[]) receivedStreamMessage.readObject();//using readObject to get a new byte[]
+            assertArrayEquals("Read bytes were not as expected", myBytes, readBytes);
+            assertEquals("Unexpected char value", myChar, receivedStreamMessage.readChar());
+            assertEquals("Unexpected double value", myDouble, receivedStreamMessage.readDouble(), 0.0);
+            assertEquals("Unexpected float value", myFloat, receivedStreamMessage.readFloat(), 0.0);
+            assertEquals("Unexpected int value", myInt, receivedStreamMessage.readInt());
+            assertEquals("Unexpected long value", myLong, receivedStreamMessage.readLong());
+            assertEquals("Unexpected short value", myShort, receivedStreamMessage.readShort());
+            assertEquals("Unexpected UTF value", myString, receivedStreamMessage.readString());
+        }
+    }
+
+/*
+ * TODO: decide what to do about this
+ *
+ * The test below fails if a char is added, because the DataImpl-based decoder used by the test peer
+ * decodes the char to an Integer object and thus the EncodedAmqpValueMatcher fails the comparison
+ * of its contained map due to the differing types. This doesn't happen in the above test as the
+ * reversed roles mean it is DecoderImpl doing the decoding and it casts the output to a char.
+ *
+ * The below test has a hack to 'expect an int' to work round this currently.
+
+    /**
+     * Test that sending a stream message to the test peer results in receipt of a message with
+     * an AmqpValue section containing a list which holds entries of the various supported entry
+     * types with the expected values.
+     */
+    @Test
+    public void testSendBasicMapMessage() throws Exception
+    {
+        try(TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);)
+        {
+            Connection connection = _testFixture.establishConnecton(testPeer);
+            testPeer.expectBegin(true);
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageProducer producer = session.createProducer(queue);
+
+            boolean myBool = true;
+            byte myByte = Byte.MAX_VALUE;
+            char myChar = 'c';
+            double myDouble = 1234567890123456789.1234;
+            float myFloat = 1.1F;
+            int myInt = Integer.MAX_VALUE;
+            long myLong = Long.MAX_VALUE;
+            short myShort = Short.MAX_VALUE;
+            String myString = "myString";
+            byte[] myBytes = "myBytes".getBytes();
+
+            //Prepare a MapMessage to send to the test peer to send
+            StreamMessage streamMessage = session.createStreamMessage();
+
+            streamMessage.writeBoolean(myBool);
+            streamMessage.writeByte(myByte);
+            streamMessage.writeBytes(myBytes);
+            streamMessage.writeChar(myChar);
+            streamMessage.writeDouble(myDouble);
+            streamMessage.writeFloat(myFloat);
+            streamMessage.writeInt(myInt);
+            streamMessage.writeLong(myLong);
+            streamMessage.writeShort(myShort);
+            streamMessage.writeString(myString);
+
+            //prepare a matcher for the test peer to use to receive and verify the message
+            List<Object> list = new ArrayList<Object>();
+            list.add(myBool);
+            list.add(myByte);
+            list.add(new Binary(myBytes));//the underlying AMQP message uses Binary rather than byte[] directly.
+            list.add((int) myChar);//TODO: see note above about chars
+            list.add(myDouble);
+            list.add(myFloat);
+            list.add(myInt);
+            list.add(myLong);
+            list.add(myShort);
+            list.add(myString);
+
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            msgAnnotationsMatcher.withEntry(Symbol.valueOf(ClientProperties.X_OPT_JMS_MSG_TYPE), equalTo(ClientProperties.STREAM_MESSAGE_TYPE));
+            MessagePropertiesSectionMatcher propertiesMatcher = new MessagePropertiesSectionMatcher(true);
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+            messageMatcher.setPropertiesMatcher(propertiesMatcher);
+            messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(list));
+
+            //send the message
+            testPeer.expectTransfer(messageMatcher);
+            producer.send(streamMessage);
+        }
+    }
+}

Modified: qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java
URL: http://svn.apache.org/viewvc/qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java?rev=1613183&r1=1613182&r2=1613183&view=diff
==============================================================================
--- qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java (original)
+++ qpid/jms/trunk/src/test/java/org/apache/qpid/jms/impl/StreamMessageImplTest.java Thu Jul 24 16:40:27 2014
@@ -24,13 +24,16 @@ import static org.junit.Assert.assertArr
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
 import static org.junit.Assert.fail;
 
+import java.math.BigInteger;
 import java.util.Arrays;
 
 import javax.jms.JMSException;
 import javax.jms.MessageEOFException;
 import javax.jms.MessageFormatException;
+import javax.jms.MessageNotReadableException;
 import javax.jms.StreamMessage;
 
 import org.apache.qpid.jms.QpidJmsTestCase;
@@ -98,6 +101,121 @@ public class StreamMessageImplTest exten
         }
     }
 
+    @Test
+    public void testNewMessageIsWriteOnlyThrowsMNRE() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        try
+        {
+            streamMessageImpl.readBoolean();
+            fail("Expected exception to be thrown as message is not readable");
+        }
+        catch(MessageNotReadableException mnre)
+        {
+            //expected
+        }
+    }
+
+    /**
+     * Verify the stream position is not incremented during illegal type conversion failure.
+     * This covers every read method except readObject (which doesn't do type conversion)
+     * and readBytes(), which is tested by {@link #testIllegalTypeConvesionFailureDoesNotIncrementPosition2}
+     *
+     * Write bytes, then deliberately try to retrieve them as illegal types, then
+     * check they can be successfully read.
+     */
+    @Test
+    public void testIllegalTypeConvesionFailureDoesNotIncrementPosition1() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        byte[] bytes =  new byte[]{(byte)0, (byte)255, (byte)78};
+
+        streamMessageImpl.writeBytes(bytes);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Boolean.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Byte.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Short.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Character.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Integer.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Long.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Float.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Double.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, String.class);
+
+        byte[] retrievedByteArray = new byte[bytes.length];
+        int readBytesLength = streamMessageImpl.readBytes(retrievedByteArray);
+
+        assertEquals("Number of bytes read did not match original array length", bytes.length, readBytesLength);
+        assertArrayEquals("Expected array to equal retrieved bytes", bytes, retrievedByteArray);
+        assertEquals("Expected completion return value", -1, streamMessageImpl.readBytes(retrievedByteArray));
+    }
+
+    /**
+     * Verify the stream position is not incremented during illegal type conversion failure.
+     * This test covers only readBytes, other methods are tested by
+     * {@link #testIllegalTypeConvesionFailureDoesNotIncrementPosition1}
+     *
+     * Write String, then deliberately try illegal retrieval as bytes, then
+     * check it can be successfully read.
+     */
+    @Test
+    public void testIllegalTypeConvesionFailureDoesNotIncrementPosition2() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        String stringVal = "myString";
+        streamMessageImpl.writeString(stringVal);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, byte[].class);
+
+        assertEquals("Expected written string", stringVal, streamMessageImpl.readString());
+    }
+
+    /**
+     * When a null stream entry is encountered, the accessor methods is type dependent and should
+     * either return null, throw NPE, or behave in the same fashion as <primitive>.valueOf(String).
+     *
+     * Test that this is the case, and in doing show demonstrate that primitive type conversion failure
+     * does not increment the stream position, as shown by not hitting the end of the stream unexpectedly.
+     */
+    @Test
+    public void testNullStreamEntryResultsInExpectedBehaviour() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        streamMessageImpl.writeObject(null);
+        streamMessageImpl.reset();
+
+        //expect an NFE from the primitive integral <type>.valueOf(null) conversions
+        assertGetStreamEntryThrowsNumberFormatException(streamMessageImpl, Byte.class);
+        assertGetStreamEntryThrowsNumberFormatException(streamMessageImpl, Short.class);
+        assertGetStreamEntryThrowsNumberFormatException(streamMessageImpl, Integer.class);
+        assertGetStreamEntryThrowsNumberFormatException(streamMessageImpl, Long.class);
+
+        //expect an NPE from the primitive float, double, and char <type>.valuleOf(null) conversions
+        assertGetStreamEntryThrowsNullPointerException(streamMessageImpl, Float.class);
+        assertGetStreamEntryThrowsNullPointerException(streamMessageImpl, Double.class);
+        assertGetStreamEntryThrowsNullPointerException(streamMessageImpl, Character.class);
+
+        //expect null
+        assertNull(streamMessageImpl.readObject());
+        streamMessageImpl.reset(); //need to reset as read was a success
+        assertNull(streamMessageImpl.readString());
+        streamMessageImpl.reset(); //need to reset as read was a success
+
+        //expect completion value.
+        assertEquals(-1, streamMessageImpl.readBytes(new byte[1]));
+        streamMessageImpl.reset(); //need to reset as read was a success
+
+        //expect false from Boolean.valueOf(null).
+        assertFalse(streamMessageImpl.readBoolean());
+        streamMessageImpl.reset(); //need to reset as read was a success
+    }
+
     // ======= object =========
 
     @Test
@@ -106,7 +224,7 @@ public class StreamMessageImplTest exten
         StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
         try
         {
-            streamMessageImpl.writeObject(new Exception());
+            streamMessageImpl.writeObject(BigInteger.ONE);
             fail("Expected exception to be thrown");
         }
         catch(MessageFormatException mfe)
@@ -115,6 +233,50 @@ public class StreamMessageImplTest exten
         }
     }
 
+    @Test
+    public void testWriteReadObject() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        Object nullEntryValue = null;
+        Boolean boolEntryValue = Boolean.valueOf(false);
+        Byte byteEntryValue = Byte.valueOf((byte)1);
+        Short shortEntryValue = Short.valueOf((short)2);
+        Integer intEntryValue = Integer.valueOf(3);
+        Long longEntryValue = Long.valueOf(4);
+        Float floatEntryValue = Float.valueOf(5.01F);
+        Double doubleEntryValue = Double.valueOf(6.01);
+        String stringEntryValue = "string";
+        Character charEntryValue = Character.valueOf('c');
+        byte[] bytes = new byte[] { (byte)1, (byte) 170, (byte)65};
+
+        streamMessageImpl.writeObject(nullEntryValue);
+        streamMessageImpl.writeObject(boolEntryValue);
+        streamMessageImpl.writeObject(byteEntryValue);
+        streamMessageImpl.writeObject(shortEntryValue);
+        streamMessageImpl.writeObject(intEntryValue);
+        streamMessageImpl.writeObject(longEntryValue);
+        streamMessageImpl.writeObject(floatEntryValue);
+        streamMessageImpl.writeObject(doubleEntryValue);
+        streamMessageImpl.writeObject(stringEntryValue);
+        streamMessageImpl.writeObject(charEntryValue);
+        streamMessageImpl.writeObject(bytes);
+
+        streamMessageImpl.reset();
+
+        assertEquals("Got unexpected value from stream", nullEntryValue, streamMessageImpl.readObject());
+        assertEquals("Got unexpected value from stream", boolEntryValue, streamMessageImpl.readObject());
+        assertEquals("Got unexpected value from stream", byteEntryValue, streamMessageImpl.readObject());
+        assertEquals("Got unexpected value from stream", shortEntryValue, streamMessageImpl.readObject());
+        assertEquals("Got unexpected value from stream", intEntryValue, streamMessageImpl.readObject());
+        assertEquals("Got unexpected value from stream", longEntryValue, streamMessageImpl.readObject());
+        assertEquals("Got unexpected value from stream", floatEntryValue, streamMessageImpl.readObject());
+        assertEquals("Got unexpected value from stream", doubleEntryValue, streamMessageImpl.readObject());
+        assertEquals("Got unexpected value from stream", stringEntryValue, streamMessageImpl.readObject());
+        assertEquals("Got unexpected value from stream", charEntryValue, streamMessageImpl.readObject());
+        assertArrayEquals("Got unexpected value from stream", bytes, (byte[])streamMessageImpl.readObject());
+    }
+
     // ======= bytes =========
 
     /**
@@ -151,7 +313,6 @@ public class StreamMessageImplTest exten
         streamMessageImpl.reset();
 
         assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Boolean.class);
-        /* TODO: enable when implementing
         assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Byte.class);
         assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Short.class);
         assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Character.class);
@@ -160,7 +321,50 @@ public class StreamMessageImplTest exten
         assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Float.class);
         assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Double.class);
         assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, String.class);
-        */
+    }
+
+    @Test
+    public void testReadBytesWithNullSignalsCompletion() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+        streamMessageImpl.writeObject(null);
+
+        streamMessageImpl.reset();
+
+        assertEquals("Expected immediate completion signal", -1, streamMessageImpl.readBytes(new byte[1]));
+    }
+
+    @Test
+    public void testReadBytesWithZeroLengthSource() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        streamMessageImpl.writeBytes(new byte[0]);
+
+        streamMessageImpl.reset();
+
+        byte[] fullRetrievedBytes = new byte[1];
+
+        assertEquals("Expected no bytes to be read, as none were written", 0, streamMessageImpl.readBytes(fullRetrievedBytes));
+    }
+
+    @Test
+    public void testReadBytesWithZeroLengthDestination() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        byte[] bytes = new byte[]{(byte)11, (byte)44, (byte)99};
+        streamMessageImpl.writeBytes(bytes);
+
+        streamMessageImpl.reset();
+
+        byte[] zeroDestination = new byte[0];
+        byte[] fullRetrievedBytes = new byte[bytes.length];
+
+        assertEquals("Expected no bytes to be read", 0, streamMessageImpl.readBytes(zeroDestination));
+        assertEquals("Expected all bytes to be read", bytes.length, streamMessageImpl.readBytes(fullRetrievedBytes));
+        assertArrayEquals("Expected arrays to be equal", bytes, fullRetrievedBytes);
+        assertEquals("Expected completion signal", -1, streamMessageImpl.readBytes(zeroDestination));
     }
 
     @Test
@@ -291,8 +495,49 @@ public class StreamMessageImplTest exten
                 Arrays.copyOfRange(bytes, partialLength, bytes.length), Arrays.copyOfRange(retrievedByteArray, 0, readBytesLength));
     }
 
+    /**
+     * Verify that setting bytes takes a copy of the array.
+     * Set bytes subset, then retrieve the entry and verify the are different arrays and the subsets are equal.
+     */
+    @Test
+    public void testWriteBytesWithOffsetAndLength() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        byte[] orig = "myBytesAll".getBytes();
+
+        //extract the segment containing 'Bytes'
+        int offset = 2;
+        int length = 5;
+        byte[] segment = Arrays.copyOfRange(orig, offset, offset + length);
+
+        //set the same section from the original bytes
+        streamMessageImpl.writeBytes(orig, offset, length);
+        streamMessageImpl.reset();
+
+        byte[] retrieved = (byte[]) streamMessageImpl.readObject();
+
+        //verify the retrieved bytes from the stream equal the segment but are not the same
+        assertNotSame(orig, retrieved);
+        assertNotSame(segment, retrieved);
+        assertArrayEquals(segment, retrieved);
+    }
+
     //========= boolean ========
 
+    @Test
+    public void testWriteReadBoolean() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        boolean value = true;
+
+        streamMessageImpl.writeBoolean(value);
+        streamMessageImpl.reset();
+
+        assertEquals("Value not as expected", value, streamMessageImpl.readBoolean());
+    }
+
     /**
      * Set a boolean, then retrieve it as all of the legal type combinations to verify it is parsed correctly
      */
@@ -306,11 +551,8 @@ public class StreamMessageImplTest exten
         streamMessageImpl.writeBoolean(value);
         streamMessageImpl.reset();
 
-        assertGetStreamEntryEquals(streamMessageImpl, value, Boolean.class);
-
-        /* TODO: enable when implementing
-        assertGetStreamEntryEquals(streamMessageImpl, String.valueOf(value), String.class);
-        */
+        assertGetStreamEntryEquals(streamMessageImpl, true, value, Boolean.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, String.valueOf(value), String.class);
     }
 
     /**
@@ -326,7 +568,6 @@ public class StreamMessageImplTest exten
         streamMessageImpl.writeBoolean(value);
         streamMessageImpl.reset();
 
-        /* TODO: enable when implementing
         assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Byte.class);
         assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Short.class);
         assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Character.class);
@@ -335,12 +576,475 @@ public class StreamMessageImplTest exten
         assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Float.class);
         assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Double.class);
         assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, byte[].class);
-        */
+    }
+
+    //========= string ========
+
+    @Test
+    public void testWriteReadString() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        String value = "myString";
+
+        streamMessageImpl.writeString(value);
+        streamMessageImpl.reset();
+
+        assertEquals("Value not as expected", value, streamMessageImpl.readString());
+    }
+
+    /**
+     * Set a string, then retrieve it as all of the legal type combinations to verify it is parsed correctly
+     */
+    @Test
+    public void testWriteStringReadLegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        String integralValue = String.valueOf(Byte.MAX_VALUE);
+        streamMessageImpl.writeString(integralValue);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryEquals(streamMessageImpl, true, String.valueOf(integralValue), String.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, Boolean.valueOf(integralValue), Boolean.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, Byte.valueOf(integralValue), Byte.class);
+
+        streamMessageImpl.clearBody();
+        integralValue = String.valueOf(Short.MAX_VALUE);
+        streamMessageImpl.writeString(integralValue);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryEquals(streamMessageImpl, true, Short.valueOf(integralValue), Short.class);
+
+        streamMessageImpl.clearBody();
+        integralValue = String.valueOf(Integer.MAX_VALUE);
+        streamMessageImpl.writeString(integralValue);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryEquals(streamMessageImpl, true, Integer.valueOf(integralValue), Integer.class);
+
+        streamMessageImpl.clearBody();
+        integralValue = String.valueOf(Long.MAX_VALUE);
+        streamMessageImpl.writeString(integralValue);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryEquals(streamMessageImpl, true, Long.valueOf(integralValue), Long.class);
+
+        streamMessageImpl.clearBody();
+        String fpValue = String.valueOf(Float.MAX_VALUE);
+        streamMessageImpl.writeString(fpValue);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryEquals(streamMessageImpl, true, Float.valueOf(fpValue), Float.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, Double.valueOf(fpValue), Double.class);
+    }
+
+    /**
+     * Set a string, then retrieve it as all of the illegal type combinations to verify it fails as expected
+     */
+    @Test
+    public void testWriteStringReadIllegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        String stringValue = "myString";
+
+        streamMessageImpl.writeString(stringValue);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Character.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, byte[].class);
+    }
+
+    //========= byte ========
+
+    @Test
+    public void testWriteReadByte() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        byte value = (byte)6;
+
+        streamMessageImpl.writeByte(value);
+        streamMessageImpl.reset();
+
+        assertEquals("Value not as expected", value, streamMessageImpl.readByte());
+    }
+
+    /**
+     * Set a byte, then retrieve it as all of the legal type combinations to verify it is parsed correctly
+     */
+    @Test
+    public void testWriteByteReadLegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        byte value = (byte)6;
+
+        streamMessageImpl.writeByte(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryEquals(streamMessageImpl, true, Byte.valueOf(value), Byte.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, Short.valueOf(value), Short.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, Integer.valueOf(value), Integer.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, Long.valueOf(value), Long.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, String.valueOf(value), String.class);
+    }
+
+    /**
+     * Set a byte, then retrieve it as all of the illegal type combinations to verify it fails as expected
+     */
+    @Test
+    public void testWriteByteReadIllegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        byte value = (byte)6;
+
+        streamMessageImpl.writeByte(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Boolean.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Character.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Float.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Double.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, byte[].class);
+    }
+
+    //========= short ========
+
+    @Test
+    public void testWriteReadShort() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        short value = (short)302;
+
+        streamMessageImpl.writeShort(value);
+        streamMessageImpl.reset();
+
+        assertEquals("Value not as expected", value, streamMessageImpl.readShort());
+    }
+
+    /**
+     * Set a short, then retrieve it as all of the legal type combinations to verify it is parsed correctly
+     */
+    @Test
+    public void testWriteShortReadLegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        short value = (short)302;
+
+        streamMessageImpl.writeShort(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryEquals(streamMessageImpl, true, Short.valueOf(value), Short.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, Integer.valueOf(value), Integer.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, Long.valueOf(value), Long.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, String.valueOf(value), String.class);
+    }
+
+    /**
+     * Set a short, then retrieve it as all of the illegal type combinations to verify it fails as expected
+     */
+    @Test
+    public void testWriteShortReadIllegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        short value = (short)302;
+
+        streamMessageImpl.writeShort(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Boolean.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Byte.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Character.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Float.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Double.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, byte[].class);
+    }
+
+    //========= char ========
+
+    @Test
+    public void testWriteReadChar() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        char value = 'c';
+
+        streamMessageImpl.writeChar(value);
+        streamMessageImpl.reset();
+
+        assertEquals("Value not as expected", value, streamMessageImpl.readChar());
+    }
+
+    /**
+     * Set a char, then retrieve it as all of the legal type combinations to verify it is parsed correctly
+     */
+    @Test
+    public void testWriteCharReadLegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        char value = 'c';
+
+        streamMessageImpl.writeChar(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryEquals(streamMessageImpl, true, value, Character.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, String.valueOf(value), String.class);
+    }
+
+    /**
+     * Set a char, then retrieve it as all of the illegal type combinations to verify it fails as expected
+     */
+    @Test
+    public void testWriteCharReadIllegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        char value = 'c';
+
+        streamMessageImpl.writeChar(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Boolean.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Byte.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Short.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Integer.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Long.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Float.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Double.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, byte[].class);
+    }
+
+    //========= int ========
+
+    @Test
+    public void testWriteReadInt() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        int value = Integer.MAX_VALUE;
+
+        streamMessageImpl.writeInt(value);
+        streamMessageImpl.reset();
+
+        assertEquals("Value not as expected", value, streamMessageImpl.readInt());
+    }
+
+    /**
+     * Set an int, then retrieve it as all of the legal type combinations to verify it is parsed correctly
+     */
+    @Test
+    public void testWriteIntReadLegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        int value = Integer.MAX_VALUE;
+
+        streamMessageImpl.writeInt(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryEquals(streamMessageImpl, true, Integer.valueOf(value), Integer.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, Long.valueOf(value), Long.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, String.valueOf(value), String.class);
+    }
+
+    /**
+     * Set an int, then retrieve it as all of the illegal type combinations to verify it fails as expected
+     */
+    @Test
+    public void testWriteIntReadIllegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        int value = Integer.MAX_VALUE;
+
+        streamMessageImpl.writeInt(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Boolean.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Byte.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Short.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Character.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Float.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Double.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, byte[].class);
+    }
+
+    //========= long ========
+
+    @Test
+    public void testWriteReadLong() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        long value = Long.MAX_VALUE;
+
+        streamMessageImpl.writeLong(value);
+        streamMessageImpl.reset();
+
+        assertEquals("Value not as expected", value, streamMessageImpl.readLong());
+    }
+
+    /**
+     * Set a long, then retrieve it as all of the legal type combinations to verify it is parsed correctly
+     */
+    @Test
+    public void testWriteLongReadLegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        long value = Long.MAX_VALUE;
+
+        streamMessageImpl.writeLong(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryEquals(streamMessageImpl, true, Long.valueOf(value), Long.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, String.valueOf(value), String.class);
+    }
+
+    /**
+     * Set a long, then retrieve it as all of the illegal type combinations to verify it fails as expected
+     */
+    @Test
+    public void testWriteLongReadIllegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        long value = Long.MAX_VALUE;
+
+        streamMessageImpl.writeLong(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Boolean.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Byte.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Short.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Character.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Integer.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Float.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Double.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, byte[].class);
+    }
+
+    //========= float ========
+
+    @Test
+    public void testWriteReadFloat() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        float value = Float.MAX_VALUE;
+
+        streamMessageImpl.writeFloat(value);
+        streamMessageImpl.reset();
+
+        assertEquals("Value not as expected", value, streamMessageImpl.readFloat(), 0.0);
+    }
+
+    /**
+     * Set a float, then retrieve it as all of the legal type combinations to verify it is parsed correctly
+     */
+    @Test
+    public void testWriteFloatReadLegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        float value = Float.MAX_VALUE;
+
+        streamMessageImpl.writeFloat(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryEquals(streamMessageImpl, true, Float.valueOf(value), Float.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, Double.valueOf(value), Double.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, String.valueOf(value), String.class);
+    }
+
+    /**
+     * Set a float, then retrieve it as all of the illegal type combinations to verify it fails as expected
+     */
+    @Test
+    public void testWriteFloatReadIllegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        float value = Float.MAX_VALUE;
+
+        streamMessageImpl.writeFloat(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Boolean.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Byte.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Short.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Character.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Integer.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Long.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, byte[].class);
+    }
+
+    //========= double ========
+
+    @Test
+    public void testWriteReadDouble() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        double value = Double.MAX_VALUE;
+
+        streamMessageImpl.writeDouble(value);
+        streamMessageImpl.reset();
+
+        assertEquals("Value not as expected", value, streamMessageImpl.readDouble(), 0.0);
+    }
+
+    /**
+     * Set a double, then retrieve it as all of the legal type combinations to verify it is parsed correctly
+     */
+    @Test
+    public void testWriteDoubleReadLegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        double value = Double.MAX_VALUE;
+
+        streamMessageImpl.writeDouble(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryEquals(streamMessageImpl, true, Double.valueOf(value), Double.class);
+        assertGetStreamEntryEquals(streamMessageImpl, true, String.valueOf(value), String.class);
+    }
+
+    /**
+     * Set a double, then retrieve it as all of the illegal type combinations to verify it fails as expected
+     */
+    @Test
+    public void testWriteDoubleReadIllegal() throws Exception
+    {
+        StreamMessageImpl streamMessageImpl = new StreamMessageImpl(_mockSessionImpl,_mockConnectionImpl);
+
+        double value = Double.MAX_VALUE;
+
+        streamMessageImpl.writeDouble(value);
+        streamMessageImpl.reset();
+
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Boolean.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Byte.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Short.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Character.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Integer.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Long.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, Float.class);
+        assertGetStreamEntryThrowsMessageFormatException(streamMessageImpl, byte[].class);
     }
 
     //========= utility methods ========
 
     private void assertGetStreamEntryEquals(StreamMessageImpl testMessage,
+                                            boolean resetStreamAfter,
                                             Object expectedValue,
                                             Class<?> clazz) throws JMSException
     {
@@ -351,6 +1055,11 @@ public class StreamMessageImplTest exten
 
         Object actualValue = getStreamEntryUsingTypeMethod(testMessage, clazz, null);
         assertEquals(expectedValue, actualValue);
+
+        if(resetStreamAfter)
+        {
+            testMessage.reset();
+        }
     }
 
     private void assertGetStreamEntryThrowsMessageFormatException(StreamMessageImpl testMessage,
@@ -368,6 +1077,20 @@ public class StreamMessageImplTest exten
         }
     }
 
+    private void assertGetStreamEntryThrowsNullPointerException(StreamMessageImpl testMessage, Class<?> clazz) throws JMSException
+    {
+        try
+        {
+            getStreamEntryUsingTypeMethod(testMessage, clazz, new byte[0]);
+
+            fail("expected exception to be thrown");
+        }
+        catch(NullPointerException npe)
+        {
+            //expected
+        }
+    }
+
     private void assertGetStreamEntryThrowsNumberFormatException(StreamMessageImpl testMessage,
                                                                  Class<?> clazz) throws JMSException
     {



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org