You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/11 17:45:24 UTC

svn commit: r1348915 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/commands/ main/activemq/util/ main/cms/ test/activemq/commands/

Author: tabish
Date: Mon Jun 11 15:45:23 2012
New Revision: 1348915

URL: http://svn.apache.org/viewvc?rev=1348915&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-411

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/StreamMessage.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQStreamMessageTest.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h?rev=1348915&r1=1348914&r2=1348915&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h Mon Jun 11 15:45:23 2012
@@ -146,6 +146,8 @@ namespace commands {
                     case util::PrimitiveValueNode::STRING_TYPE:
                     case util::PrimitiveValueNode::BIG_STRING_TYPE:
                         return cms::Message::STRING_TYPE;
+                    case util::PrimitiveValueNode::BYTE_ARRAY_TYPE:
+                        return cms::Message::BYTE_ARRAY_TYPE;
                     default:
                         break;
                 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp?rev=1348915&r1=1348914&r2=1348915&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp Mon Jun 11 15:45:23 2012
@@ -36,6 +36,7 @@
 #include <decaf/lang/Long.h>
 #include <decaf/lang/Double.h>
 #include <decaf/lang/Float.h>
+#include <decaf/io/ByteArrayOutputStream.h>
 #include <decaf/io/ByteArrayInputStream.h>
 #include <decaf/io/BufferedInputStream.h>
 #include <decaf/util/zip/DeflaterOutputStream.h>
@@ -57,15 +58,46 @@ using namespace decaf::util;
 using namespace decaf::util::zip;
 
 ////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace commands {
+
+    class ActiveMQStreamMessageImpl {
+    private:
+
+        ActiveMQStreamMessageImpl(const ActiveMQStreamMessageImpl&);
+        ActiveMQStreamMessageImpl& operator= (const ActiveMQStreamMessageImpl&);
+
+    public:
+
+        ActiveMQStreamMessageImpl() : bytesOut(NULL), remainingBytes(-1) {}
+        ~ActiveMQStreamMessageImpl() {}
+
+    public:
+
+        // Holds the contents of the message once written.
+        decaf::io::ByteArrayOutputStream* bytesOut;
+
+        // When reading an array of bytes this value indicates how many bytes
+        // are left unread since the last readBytes call.
+        mutable int remainingBytes;
+
+    };
+}}
+
+////////////////////////////////////////////////////////////////////////////////
 ActiveMQStreamMessage::ActiveMQStreamMessage() :
-    ActiveMQMessageTemplate< cms::StreamMessage >(), bytesOut(NULL), dataIn(), dataOut(), remainingBytes(-1) {
+    ActiveMQMessageTemplate<cms::StreamMessage>(), impl(new ActiveMQStreamMessageImpl()), dataIn(), dataOut() {
 
     this->clearBody();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQStreamMessage::~ActiveMQStreamMessage() throw() {
-    this->reset();
+ActiveMQStreamMessage::~ActiveMQStreamMessage() throw () {
+    try {
+        this->reset();
+        delete impl;
+    }
+    AMQ_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -75,41 +107,39 @@ unsigned char ActiveMQStreamMessage::get
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQStreamMessage* ActiveMQStreamMessage::cloneDataStructure() const {
-    std::auto_ptr<ActiveMQStreamMessage> message( new ActiveMQStreamMessage() );
-    message->copyDataStructure( this );
+    std::auto_ptr<ActiveMQStreamMessage> message(new ActiveMQStreamMessage());
+    message->copyDataStructure(this);
     return message.release();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::copyDataStructure( const DataStructure* src ) {
+void ActiveMQStreamMessage::copyDataStructure(const DataStructure* src) {
 
     // Protect against invalid self assignment.
-    if( this == src ) {
+    if (this == src) {
         return;
     }
 
-    const ActiveMQStreamMessage* srcPtr = dynamic_cast<const ActiveMQStreamMessage*>( src );
+    const ActiveMQStreamMessage* srcPtr = dynamic_cast<const ActiveMQStreamMessage*> (src);
 
-    if( srcPtr == NULL || src == NULL ) {
-        throw decaf::lang::exceptions::NullPointerException(
-            __FILE__, __LINE__,
-            "ActiveMQStreamMessage::copyDataStructure - src is NULL or invalid" );
+    if (srcPtr == NULL || src == NULL) {
+        throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__, "ActiveMQStreamMessage::copyDataStructure - src is NULL or invalid");
     }
 
-    ActiveMQStreamMessage* nonConstSrc = const_cast<ActiveMQStreamMessage*>( srcPtr );
+    ActiveMQStreamMessage* nonConstSrc = const_cast<ActiveMQStreamMessage*> (srcPtr);
     nonConstSrc->storeContent();
 
-    ActiveMQMessageTemplate<cms::StreamMessage>::copyDataStructure( src );
+    ActiveMQMessageTemplate<cms::StreamMessage>::copyDataStructure(src);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-std::string ActiveMQStreamMessage::toString() const{
+std::string ActiveMQStreamMessage::toString() const {
     return ActiveMQMessageTemplate<cms::StreamMessage>::toString();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool ActiveMQStreamMessage::equals( const DataStructure* value ) const {
-    return ActiveMQMessageTemplate<cms::StreamMessage>::equals( value );
+bool ActiveMQStreamMessage::equals(const DataStructure* value) const {
+    return ActiveMQMessageTemplate<cms::StreamMessage>::equals(value);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -118,10 +148,10 @@ void ActiveMQStreamMessage::clearBody() 
     // Invoke base class's version.
     ActiveMQMessageTemplate<cms::StreamMessage>::clearBody();
 
-    this->dataIn.reset( NULL );
-    this->dataOut.reset( NULL );
-    this->bytesOut = NULL;
-    this->remainingBytes = -1;
+    this->dataIn.reset(NULL);
+    this->dataOut.reset(NULL);
+    this->impl->bytesOut = NULL;
+    this->impl->remainingBytes = -1;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -134,12 +164,12 @@ void ActiveMQStreamMessage::onSend() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQStreamMessage::reset() {
 
-    try{
+    try {
         storeContent();
-        this->bytesOut = NULL;
+        this->impl->bytesOut = NULL;
         this->dataIn.reset(NULL);
         this->dataOut.reset(NULL);
-        this->remainingBytes = -1;
+        this->impl->remainingBytes = -1;
         this->setReadOnlyBody(true);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -148,7 +178,7 @@ void ActiveMQStreamMessage::reset() {
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQStreamMessage::readBoolean() const {
 
-    try{
+    try {
 
         initializeReading();
 
@@ -156,40 +186,40 @@ bool ActiveMQStreamMessage::readBoolean(
 
         int type = this->dataIn->read();
 
-        if( type == -1 ) {
-            throw MessageEOFException( "Reached the end of the Stream", NULL );
+        if (type == -1) {
+            throw MessageEOFException("Reached the end of the Stream", NULL);
         }
-        if( type == PrimitiveValueNode::BOOLEAN_TYPE ) {
+        if (type == PrimitiveValueNode::BOOLEAN_TYPE) {
             return this->dataIn->readBoolean();
         }
-        if( type == PrimitiveValueNode::STRING_TYPE ) {
-            return Boolean::valueOf( this->dataIn->readUTF() ).booleanValue();
+        if (type == PrimitiveValueNode::STRING_TYPE) {
+            return Boolean::valueOf(this->dataIn->readUTF()).booleanValue();
         }
 
-        if( type == PrimitiveValueNode::NULL_TYPE ) {
+        if (type == PrimitiveValueNode::NULL_TYPE) {
             this->dataIn->reset();
-            throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to boolean." );
+            throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to boolean.");
         } else {
             this->dataIn->reset();
-            throw MessageFormatException( "not a boolean type", NULL );
+            throw MessageFormatException("not a boolean type", NULL);
         }
 
-    } catch( EOFException& e ) {
-        throw CMSExceptionSupport::createMessageEOFException( e );
-    } catch( IOException& e ) {
-        throw CMSExceptionSupport::createMessageFormatException( e );
-    } catch( Exception& e ) {
-        throw CMSExceptionSupport::create( e );
+    } catch (EOFException& e) {
+        throw CMSExceptionSupport::createMessageEOFException(e);
+    } catch (IOException& e) {
+        throw CMSExceptionSupport::createMessageFormatException(e);
+    } catch (Exception& e) {
+        throw CMSExceptionSupport::create(e);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeBoolean( bool value ) {
+void ActiveMQStreamMessage::writeBoolean(bool value) {
 
     initializeWriting();
-    try{
-        this->dataOut->write( PrimitiveValueNode::BOOLEAN_TYPE );
-        this->dataOut->writeBoolean( value );
+    try {
+        this->dataOut->write(PrimitiveValueNode::BOOLEAN_TYPE);
+        this->dataOut->writeBoolean(value);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -203,137 +233,137 @@ unsigned char ActiveMQStreamMessage::rea
         this->dataIn->mark(10);
         int type = this->dataIn->read();
 
-        if( type == -1 ) {
-            throw MessageEOFException( "reached end of data", NULL );
+        if (type == -1) {
+            throw MessageEOFException("reached end of data", NULL);
         }
-        if( type == PrimitiveValueNode::BYTE_TYPE ) {
+        if (type == PrimitiveValueNode::BYTE_TYPE) {
             return this->dataIn->readByte();
         }
-        if( type == PrimitiveValueNode::STRING_TYPE ) {
-            return Byte::valueOf( this->dataIn->readUTF() ).byteValue();
+        if (type == PrimitiveValueNode::STRING_TYPE) {
+            return Byte::valueOf(this->dataIn->readUTF()).byteValue();
         }
 
-        if( type == PrimitiveValueNode::NULL_TYPE ) {
+        if (type == PrimitiveValueNode::NULL_TYPE) {
             this->dataIn->reset();
-            throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to byte." );
+            throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to byte.");
         } else {
             this->dataIn->reset();
-            throw MessageFormatException( " not a byte type", NULL );
+            throw MessageFormatException(" not a byte type", NULL);
         }
 
-    } catch( NumberFormatException& ex ) {
+    } catch (NumberFormatException& ex) {
 
         try {
             this->dataIn->reset();
-        } catch( IOException& e ) {
+        } catch (IOException& e) {
             throw CMSExceptionSupport::create(e);
         }
 
-        throw CMSExceptionSupport::createMessageFormatException( ex );
+        throw CMSExceptionSupport::createMessageFormatException(ex);
 
-    } catch( EOFException& e ) {
-        throw CMSExceptionSupport::createMessageEOFException( e );
-    } catch( IOException& e ) {
-        throw CMSExceptionSupport::createMessageFormatException( e );
-    } catch( Exception& e ) {
-        throw CMSExceptionSupport::create( e );
+    } catch (EOFException& e) {
+        throw CMSExceptionSupport::createMessageEOFException(e);
+    } catch (IOException& e) {
+        throw CMSExceptionSupport::createMessageFormatException(e);
+    } catch (Exception& e) {
+        throw CMSExceptionSupport::create(e);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeByte( unsigned char value ) {
+void ActiveMQStreamMessage::writeByte(unsigned char value) {
 
     initializeWriting();
-    try{
-        this->dataOut->write( PrimitiveValueNode::BYTE_TYPE );
-        this->dataOut->writeByte( value );
+    try {
+        this->dataOut->write(PrimitiveValueNode::BYTE_TYPE);
+        this->dataOut->writeByte(value);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int ActiveMQStreamMessage::readBytes( std::vector<unsigned char>& value ) const {
+int ActiveMQStreamMessage::readBytes(std::vector<unsigned char>& value) const {
 
-    if( value.size() == 0 ) {
+    if (value.size() == 0) {
         return 0;
     }
 
-    return this->readBytes( &value[0], (int)value.size() );
+    return this->readBytes(&value[0], (int) value.size());
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeBytes( const std::vector<unsigned char>& value ) {
+void ActiveMQStreamMessage::writeBytes(const std::vector<unsigned char>& value) {
 
     initializeWriting();
-    try{
+    try {
 
-        int size = (int)value.size();
-        this->dataOut->write( PrimitiveValueNode::BYTE_ARRAY_TYPE );
-        this->dataOut->writeInt( (int)size );
-        this->dataOut->write( &value[0], size, 0, size );
+        int size = (int) value.size();
+        this->dataOut->write(PrimitiveValueNode::BYTE_ARRAY_TYPE);
+        this->dataOut->writeInt((int) size);
+        this->dataOut->write(&value[0], size, 0, size);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-int ActiveMQStreamMessage::readBytes( unsigned char* buffer, int length ) const {
+int ActiveMQStreamMessage::readBytes(unsigned char* buffer, int length) const {
 
     initializeReading();
     try {
 
-        if( buffer == NULL ) {
-            throw NullPointerException( __FILE__, __LINE__, "Passed buffer was NULL" );
+        if (buffer == NULL) {
+            throw NullPointerException(__FILE__, __LINE__, "Passed buffer was NULL");
         }
 
-        if( this->remainingBytes == -1 ) {
+        if (this->impl->remainingBytes == -1) {
 
-            this->dataIn->mark( (int)length + 1 );
+            this->dataIn->mark((int) length + 1);
             int type = this->dataIn->read();
 
-            if( type == -1 ) {
-                throw MessageEOFException( "reached end of data", NULL );
+            if (type == -1) {
+                throw MessageEOFException("reached end of data", NULL);
             }
 
-            if( type != PrimitiveValueNode::BYTE_ARRAY_TYPE ) {
-                throw MessageFormatException( "Not a byte array", NULL );
+            if (type != PrimitiveValueNode::BYTE_ARRAY_TYPE) {
+                throw MessageFormatException("Not a byte array", NULL);
             }
 
-            this->remainingBytes = this->dataIn->readInt();
+            this->impl->remainingBytes = this->dataIn->readInt();
 
-        } else if( this->remainingBytes == 0 ) {
-            remainingBytes = -1;
+        } else if (this->impl->remainingBytes == 0) {
+            this->impl->remainingBytes = -1;
             return -1;
         }
 
-        if( length <= this->remainingBytes ) {
+        if (length <= this->impl->remainingBytes) {
             // small buffer
-            this->remainingBytes -= (int)length;
-            this->dataIn->readFully( buffer, length );
+            this->impl->remainingBytes -= (int) length;
+            this->dataIn->readFully(buffer, length);
             return length;
         } else {
             // big buffer
-            int rc = this->dataIn->read( buffer, length, 0, this->remainingBytes );
-            this->remainingBytes = 0;
+            int rc = this->dataIn->read(buffer, length, 0, this->impl->remainingBytes);
+            this->impl->remainingBytes = 0;
             return rc;
         }
 
-    } catch( EOFException& e ) {
-        throw CMSExceptionSupport::createMessageEOFException( e );
-    } catch( IOException& e ) {
-        throw CMSExceptionSupport::createMessageFormatException( e );
-    } catch( Exception& e ) {
-        throw CMSExceptionSupport::create( e );
+    } catch (EOFException& e) {
+        throw CMSExceptionSupport::createMessageEOFException(e);
+    } catch (IOException& e) {
+        throw CMSExceptionSupport::createMessageFormatException(e);
+    } catch (Exception& e) {
+        throw CMSExceptionSupport::create(e);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeBytes( const unsigned char* value, int offset, int length ) {
+void ActiveMQStreamMessage::writeBytes(const unsigned char* value, int offset, int length) {
 
     initializeWriting();
-    try{
-        this->dataOut->write( PrimitiveValueNode::BYTE_ARRAY_TYPE );
-        this->dataOut->writeInt( (int)length );
-        this->dataOut->write( value, length, offset, length );
+    try {
+        this->dataOut->write(PrimitiveValueNode::BYTE_ARRAY_TYPE);
+        this->dataOut->writeInt((int) length);
+        this->dataOut->write(value, length, offset, length);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -344,50 +374,51 @@ char ActiveMQStreamMessage::readChar() c
     initializeReading();
     try {
 
-        this->dataIn->mark( 17 );
+        this->dataIn->mark(17);
         int type = this->dataIn->read();
 
-        if( type == -1 ) {
-            throw MessageEOFException( "reached end of data", NULL );
+        if (type == -1) {
+            throw MessageEOFException("reached end of data", NULL);
         }
-        if( type == PrimitiveValueNode::CHAR_TYPE ) {
+        if (type == PrimitiveValueNode::CHAR_TYPE) {
             return this->dataIn->readChar();
         }
 
-        if( type == PrimitiveValueNode::NULL_TYPE ) {
+        if (type == PrimitiveValueNode::NULL_TYPE) {
             this->dataIn->reset();
-            throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to char." );
+            throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to char.");
         } else {
             this->dataIn->reset();
-            throw MessageFormatException( " not a char type", NULL );
+            throw MessageFormatException(" not a char type", NULL);
         }
 
-    } catch( NumberFormatException& ex ) {
+    } catch (NumberFormatException& ex) {
 
         try {
             this->dataIn->reset();
-        } catch( IOException& ioe ) {
-            throw CMSExceptionSupport::create( ioe );
+        } catch (IOException& ioe) {
+            throw CMSExceptionSupport::create(ioe);
         }
 
-        throw CMSExceptionSupport::create( ex );;
+        throw CMSExceptionSupport::create(ex);
+        ;
 
-    } catch( EOFException& e ) {
-        throw CMSExceptionSupport::createMessageEOFException( e );
-    } catch( IOException& e ) {
-        throw CMSExceptionSupport::createMessageFormatException( e );
-    } catch( Exception& e ) {
-        throw CMSExceptionSupport::create( e );
+    } catch (EOFException& e) {
+        throw CMSExceptionSupport::createMessageEOFException(e);
+    } catch (IOException& e) {
+        throw CMSExceptionSupport::createMessageFormatException(e);
+    } catch (Exception& e) {
+        throw CMSExceptionSupport::create(e);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeChar( char value ) {
+void ActiveMQStreamMessage::writeChar(char value) {
 
     initializeWriting();
-    try{
-        this->dataOut->write( PrimitiveValueNode::CHAR_TYPE );
-        this->dataOut->writeChar( value );
+    try {
+        this->dataOut->write(PrimitiveValueNode::CHAR_TYPE);
+        this->dataOut->writeChar(value);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -398,53 +429,53 @@ float ActiveMQStreamMessage::readFloat()
     initializeReading();
     try {
 
-        this->dataIn->mark( 33 );
+        this->dataIn->mark(33);
         int type = this->dataIn->read();
 
-        if( type == -1 ) {
-            throw MessageEOFException( "reached end of data", NULL );
+        if (type == -1) {
+            throw MessageEOFException("reached end of data", NULL);
         }
-        if( type == PrimitiveValueNode::FLOAT_TYPE ) {
+        if (type == PrimitiveValueNode::FLOAT_TYPE) {
             return this->dataIn->readFloat();
         }
-        if( type == PrimitiveValueNode::STRING_TYPE ) {
-            return Float::valueOf( this->dataIn->readUTF() ).floatValue();
+        if (type == PrimitiveValueNode::STRING_TYPE) {
+            return Float::valueOf(this->dataIn->readUTF()).floatValue();
         }
 
-        if( type == PrimitiveValueNode::NULL_TYPE )  {
+        if (type == PrimitiveValueNode::NULL_TYPE) {
             this->dataIn->reset();
-            throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to float." );
+            throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to float.");
         } else {
             this->dataIn->reset();
-            throw MessageFormatException( " not a float type", NULL );
+            throw MessageFormatException(" not a float type", NULL);
         }
 
-    } catch( NumberFormatException& ex ) {
+    } catch (NumberFormatException& ex) {
 
         try {
             this->dataIn->reset();
-        } catch( IOException& ioe ) {
-            throw CMSExceptionSupport::create( ioe );
+        } catch (IOException& ioe) {
+            throw CMSExceptionSupport::create(ioe);
         }
 
-        throw CMSExceptionSupport::create( ex );
+        throw CMSExceptionSupport::create(ex);
 
-    } catch( EOFException& e ) {
-        throw CMSExceptionSupport::createMessageEOFException( e );
-    } catch( IOException& e ) {
-        throw CMSExceptionSupport::createMessageFormatException( e );
-    } catch( Exception& e ) {
-        throw CMSExceptionSupport::create( e );
+    } catch (EOFException& e) {
+        throw CMSExceptionSupport::createMessageEOFException(e);
+    } catch (IOException& e) {
+        throw CMSExceptionSupport::createMessageFormatException(e);
+    } catch (Exception& e) {
+        throw CMSExceptionSupport::create(e);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeFloat( float value ) {
+void ActiveMQStreamMessage::writeFloat(float value) {
 
     initializeWriting();
-    try{
-        this->dataOut->write( PrimitiveValueNode::FLOAT_TYPE );
-        this->dataOut->writeFloat( value );
+    try {
+        this->dataOut->write(PrimitiveValueNode::FLOAT_TYPE);
+        this->dataOut->writeFloat(value);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -455,56 +486,56 @@ double ActiveMQStreamMessage::readDouble
     initializeReading();
     try {
 
-        this->dataIn->mark( 33 );
+        this->dataIn->mark(33);
         int type = this->dataIn->read();
 
-        if( type == -1 ) {
-            throw MessageEOFException( "reached end of data", NULL );
+        if (type == -1) {
+            throw MessageEOFException("reached end of data", NULL);
         }
-        if( type == PrimitiveValueNode::DOUBLE_TYPE ) {
+        if (type == PrimitiveValueNode::DOUBLE_TYPE) {
             return this->dataIn->readDouble();
         }
-        if( type == PrimitiveValueNode::FLOAT_TYPE ) {
+        if (type == PrimitiveValueNode::FLOAT_TYPE) {
             return this->dataIn->readFloat();
         }
-        if( type == PrimitiveValueNode::STRING_TYPE ) {
-            return Double::valueOf( this->dataIn->readUTF() ).doubleValue();
+        if (type == PrimitiveValueNode::STRING_TYPE) {
+            return Double::valueOf(this->dataIn->readUTF()).doubleValue();
         }
 
-        if( type == PrimitiveValueNode::NULL_TYPE )  {
+        if (type == PrimitiveValueNode::NULL_TYPE) {
             this->dataIn->reset();
-            throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to double." );
+            throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to double.");
         } else {
             this->dataIn->reset();
-            throw MessageFormatException( " not a double type", NULL );
+            throw MessageFormatException(" not a double type", NULL);
         }
 
-    } catch( NumberFormatException& ex ) {
+    } catch (NumberFormatException& ex) {
 
         try {
             this->dataIn->reset();
-        } catch( IOException& ioe ) {
-            throw CMSExceptionSupport::create( ioe );
+        } catch (IOException& ioe) {
+            throw CMSExceptionSupport::create(ioe);
         }
 
-        throw CMSExceptionSupport::create( ex );
+        throw CMSExceptionSupport::create(ex);
 
-    } catch( EOFException& e ) {
-        throw CMSExceptionSupport::createMessageEOFException( e );
-    } catch( IOException& e ) {
-        throw CMSExceptionSupport::createMessageFormatException( e );
-    } catch( Exception& e ) {
-        throw CMSExceptionSupport::create( e );
+    } catch (EOFException& e) {
+        throw CMSExceptionSupport::createMessageEOFException(e);
+    } catch (IOException& e) {
+        throw CMSExceptionSupport::createMessageFormatException(e);
+    } catch (Exception& e) {
+        throw CMSExceptionSupport::create(e);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeDouble( double value ) {
+void ActiveMQStreamMessage::writeDouble(double value) {
 
     initializeWriting();
-    try{
-        this->dataOut->write( PrimitiveValueNode::DOUBLE_TYPE );
-        this->dataOut->writeDouble( value );
+    try {
+        this->dataOut->write(PrimitiveValueNode::DOUBLE_TYPE);
+        this->dataOut->writeDouble(value);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -518,53 +549,53 @@ short ActiveMQStreamMessage::readShort()
         this->dataIn->mark(17);
         int type = this->dataIn->read();
 
-        if( type == -1 ) {
-            throw MessageEOFException( "reached end of data", NULL );
+        if (type == -1) {
+            throw MessageEOFException("reached end of data", NULL);
         }
-        if( type == PrimitiveValueNode::SHORT_TYPE ) {
+        if (type == PrimitiveValueNode::SHORT_TYPE) {
             return this->dataIn->readShort();
         }
-        if( type == PrimitiveValueNode::BYTE_TYPE ) {
+        if (type == PrimitiveValueNode::BYTE_TYPE) {
             return this->dataIn->readByte();
         }
-        if( type == PrimitiveValueNode::STRING_TYPE ) {
-            return Short::valueOf( this->dataIn->readUTF() ).shortValue();
+        if (type == PrimitiveValueNode::STRING_TYPE) {
+            return Short::valueOf(this->dataIn->readUTF()).shortValue();
         }
 
-        if( type == PrimitiveValueNode::NULL_TYPE ) {
+        if (type == PrimitiveValueNode::NULL_TYPE) {
             this->dataIn->reset();
-            throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to short." );
+            throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to short.");
         } else {
             this->dataIn->reset();
-            throw MessageFormatException( " not a short type", NULL );
+            throw MessageFormatException(" not a short type", NULL);
         }
 
-    } catch( NumberFormatException& ex ) {
+    } catch (NumberFormatException& ex) {
 
         try {
             this->dataIn->reset();
-        } catch( IOException& e ) {
+        } catch (IOException& e) {
             throw CMSExceptionSupport::create(e);
         }
 
-        throw CMSExceptionSupport::create( ex );
+        throw CMSExceptionSupport::create(ex);
 
-    } catch( EOFException& e ) {
-        throw CMSExceptionSupport::createMessageEOFException( e );
-    } catch( IOException& e ) {
-        throw CMSExceptionSupport::createMessageFormatException( e );
-    } catch( Exception& e ) {
-        throw CMSExceptionSupport::create( e );
+    } catch (EOFException& e) {
+        throw CMSExceptionSupport::createMessageEOFException(e);
+    } catch (IOException& e) {
+        throw CMSExceptionSupport::createMessageFormatException(e);
+    } catch (Exception& e) {
+        throw CMSExceptionSupport::create(e);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeShort( short value ) {
+void ActiveMQStreamMessage::writeShort(short value) {
 
     initializeWriting();
-    try{
-        this->dataOut->write( PrimitiveValueNode::SHORT_TYPE );
-        this->dataOut->writeShort( value );
+    try {
+        this->dataOut->write(PrimitiveValueNode::SHORT_TYPE);
+        this->dataOut->writeShort(value);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -578,53 +609,53 @@ unsigned short ActiveMQStreamMessage::re
         this->dataIn->mark(17);
         int type = this->dataIn->read();
 
-        if( type == -1 ) {
-            throw MessageEOFException( "reached end of data", NULL );
+        if (type == -1) {
+            throw MessageEOFException("reached end of data", NULL);
         }
-        if( type == PrimitiveValueNode::SHORT_TYPE ) {
+        if (type == PrimitiveValueNode::SHORT_TYPE) {
             return this->dataIn->readUnsignedShort();
         }
-        if( type == PrimitiveValueNode::BYTE_TYPE ) {
+        if (type == PrimitiveValueNode::BYTE_TYPE) {
             return this->dataIn->readByte();
         }
-        if( type == PrimitiveValueNode::STRING_TYPE ) {
-            return Short::valueOf( this->dataIn->readUTF() ).shortValue();
+        if (type == PrimitiveValueNode::STRING_TYPE) {
+            return Short::valueOf(this->dataIn->readUTF()).shortValue();
         }
 
-        if( type == PrimitiveValueNode::NULL_TYPE ) {
+        if (type == PrimitiveValueNode::NULL_TYPE) {
             this->dataIn->reset();
-            throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to short." );
+            throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to short.");
         } else {
             this->dataIn->reset();
-            throw MessageFormatException( " not a short type", NULL );
+            throw MessageFormatException(" not a short type", NULL);
         }
 
-    } catch( NumberFormatException& ex ) {
+    } catch (NumberFormatException& ex) {
 
         try {
             this->dataIn->reset();
-        } catch( IOException& e ) {
+        } catch (IOException& e) {
             throw CMSExceptionSupport::create(e);
         }
 
-        throw CMSExceptionSupport::create( ex );
+        throw CMSExceptionSupport::create(ex);
 
-    } catch( EOFException& e ) {
-        throw CMSExceptionSupport::createMessageEOFException( e );
-    } catch( IOException& e ) {
-        throw CMSExceptionSupport::createMessageFormatException( e );
-    } catch( Exception& e ) {
-        throw CMSExceptionSupport::create( e );
+    } catch (EOFException& e) {
+        throw CMSExceptionSupport::createMessageEOFException(e);
+    } catch (IOException& e) {
+        throw CMSExceptionSupport::createMessageFormatException(e);
+    } catch (Exception& e) {
+        throw CMSExceptionSupport::create(e);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeUnsignedShort( unsigned short value ) {
+void ActiveMQStreamMessage::writeUnsignedShort(unsigned short value) {
 
     initializeWriting();
-    try{
-        this->dataOut->write( PrimitiveValueNode::SHORT_TYPE );
-        this->dataOut->writeUnsignedShort( value );
+    try {
+        this->dataOut->write(PrimitiveValueNode::SHORT_TYPE);
+        this->dataOut->writeUnsignedShort(value);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -638,56 +669,56 @@ int ActiveMQStreamMessage::readInt() con
         this->dataIn->mark(33);
         int type = this->dataIn->read();
 
-        if( type == -1 ) {
-            throw MessageEOFException( "reached end of data", NULL );
+        if (type == -1) {
+            throw MessageEOFException("reached end of data", NULL);
         }
-        if( type == PrimitiveValueNode::INTEGER_TYPE ) {
+        if (type == PrimitiveValueNode::INTEGER_TYPE) {
             return this->dataIn->readInt();
         }
-        if( type == PrimitiveValueNode::SHORT_TYPE ) {
+        if (type == PrimitiveValueNode::SHORT_TYPE) {
             return this->dataIn->readShort();
         }
-        if( type == PrimitiveValueNode::BYTE_TYPE ) {
+        if (type == PrimitiveValueNode::BYTE_TYPE) {
             return this->dataIn->readByte();
         }
-        if( type == PrimitiveValueNode::STRING_TYPE ) {
-            return Integer::valueOf( this->dataIn->readUTF() ).intValue();
+        if (type == PrimitiveValueNode::STRING_TYPE) {
+            return Integer::valueOf(this->dataIn->readUTF()).intValue();
         }
 
-        if( type == PrimitiveValueNode::NULL_TYPE ) {
+        if (type == PrimitiveValueNode::NULL_TYPE) {
             this->dataIn->reset();
-            throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to int." );
+            throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to int.");
         } else {
             this->dataIn->reset();
-            throw MessageFormatException( " not a int type", NULL );
+            throw MessageFormatException(" not a int type", NULL);
         }
 
-    } catch( NumberFormatException& ex ) {
+    } catch (NumberFormatException& ex) {
 
         try {
             this->dataIn->reset();
-        } catch( IOException& e ) {
+        } catch (IOException& e) {
             throw CMSExceptionSupport::create(e);
         }
 
-        throw CMSExceptionSupport::create( ex );
+        throw CMSExceptionSupport::create(ex);
 
-    } catch( EOFException& e ) {
-        throw CMSExceptionSupport::createMessageEOFException( e );
-    } catch( IOException& e ) {
-        throw CMSExceptionSupport::createMessageFormatException( e );
-    } catch( Exception& e ) {
-        throw CMSExceptionSupport::create( e );
+    } catch (EOFException& e) {
+        throw CMSExceptionSupport::createMessageEOFException(e);
+    } catch (IOException& e) {
+        throw CMSExceptionSupport::createMessageFormatException(e);
+    } catch (Exception& e) {
+        throw CMSExceptionSupport::create(e);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeInt( int value ) {
+void ActiveMQStreamMessage::writeInt(int value) {
 
     initializeWriting();
-    try{
-        this->dataOut->write( PrimitiveValueNode::INTEGER_TYPE );
-        this->dataOut->writeInt( value );
+    try {
+        this->dataOut->write(PrimitiveValueNode::INTEGER_TYPE);
+        this->dataOut->writeInt(value);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -701,59 +732,59 @@ long long ActiveMQStreamMessage::readLon
         this->dataIn->mark(65);
         int type = this->dataIn->read();
 
-        if( type == -1 ) {
-            throw MessageEOFException( "reached end of data", NULL );
+        if (type == -1) {
+            throw MessageEOFException("reached end of data", NULL);
         }
-        if( type == PrimitiveValueNode::LONG_TYPE ) {
+        if (type == PrimitiveValueNode::LONG_TYPE) {
             return this->dataIn->readLong();
         }
-        if( type == PrimitiveValueNode::INTEGER_TYPE ) {
+        if (type == PrimitiveValueNode::INTEGER_TYPE) {
             return this->dataIn->readInt();
         }
-        if( type == PrimitiveValueNode::SHORT_TYPE ) {
+        if (type == PrimitiveValueNode::SHORT_TYPE) {
             return this->dataIn->readShort();
         }
-        if( type == PrimitiveValueNode::BYTE_TYPE ) {
+        if (type == PrimitiveValueNode::BYTE_TYPE) {
             return this->dataIn->readByte();
         }
-        if( type == PrimitiveValueNode::STRING_TYPE ) {
-            return Long::valueOf( this->dataIn->readUTF() ).longValue();
+        if (type == PrimitiveValueNode::STRING_TYPE) {
+            return Long::valueOf(this->dataIn->readUTF()).longValue();
         }
 
-        if( type == PrimitiveValueNode::NULL_TYPE ) {
+        if (type == PrimitiveValueNode::NULL_TYPE) {
             this->dataIn->reset();
-            throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to long." );
+            throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to long.");
         } else {
             this->dataIn->reset();
-            throw MessageFormatException( " not a long type", NULL );
+            throw MessageFormatException(" not a long type", NULL);
         }
 
-    } catch( NumberFormatException& ex ) {
+    } catch (NumberFormatException& ex) {
 
         try {
             this->dataIn->reset();
-        } catch( IOException& e ) {
+        } catch (IOException& e) {
             throw CMSExceptionSupport::create(e);
         }
 
-        throw CMSExceptionSupport::create( ex );
+        throw CMSExceptionSupport::create(ex);
 
-    } catch( EOFException& e ) {
-        throw CMSExceptionSupport::createMessageEOFException( e );
-    } catch( IOException& e ) {
-        throw CMSExceptionSupport::createMessageFormatException( e );
-    } catch( Exception& e ) {
-        throw CMSExceptionSupport::create( e );
+    } catch (EOFException& e) {
+        throw CMSExceptionSupport::createMessageEOFException(e);
+    } catch (IOException& e) {
+        throw CMSExceptionSupport::createMessageFormatException(e);
+    } catch (Exception& e) {
+        throw CMSExceptionSupport::create(e);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeLong( long long value ) {
+void ActiveMQStreamMessage::writeLong(long long value) {
 
     initializeWriting();
-    try{
-        this->dataOut->write( PrimitiveValueNode::LONG_TYPE );
-        this->dataOut->writeLong( value );
+    try {
+        this->dataOut->write(PrimitiveValueNode::LONG_TYPE);
+        this->dataOut->writeLong(value);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -764,94 +795,151 @@ std::string ActiveMQStreamMessage::readS
     initializeReading();
     try {
 
-        this->dataIn->mark( 65 );
+        this->dataIn->mark(65);
         int type = this->dataIn->read();
 
-        if( type == -1 ) {
-            throw MessageEOFException( "reached end of data", NULL );
+        if (type == -1) {
+            throw MessageEOFException("reached end of data", NULL);
         }
-        if( type == PrimitiveValueNode::NULL_TYPE ) {
+        if (type == PrimitiveValueNode::NULL_TYPE) {
             return "";
         }
-        if( type == PrimitiveValueNode::BIG_STRING_TYPE ) {
-            return MarshallingSupport::readString32( *this->dataIn );
+        if (type == PrimitiveValueNode::BIG_STRING_TYPE) {
+            return MarshallingSupport::readString32(*this->dataIn);
         }
-        if( type == PrimitiveValueNode::STRING_TYPE ) {
-            return MarshallingSupport::readString16( *this->dataIn );
+        if (type == PrimitiveValueNode::STRING_TYPE) {
+            return MarshallingSupport::readString16(*this->dataIn);
         }
-        if( type == PrimitiveValueNode::LONG_TYPE ) {
-            return Long( this->dataIn->readLong() ).toString();
+        if (type == PrimitiveValueNode::LONG_TYPE) {
+            return Long(this->dataIn->readLong()).toString();
         }
-        if( type == PrimitiveValueNode::INTEGER_TYPE ) {
-            return Integer( this->dataIn->readInt() ).toString();
+        if (type == PrimitiveValueNode::INTEGER_TYPE) {
+            return Integer(this->dataIn->readInt()).toString();
         }
-        if( type == PrimitiveValueNode::SHORT_TYPE ) {
-            return Short( this->dataIn->readShort() ).toString();
+        if (type == PrimitiveValueNode::SHORT_TYPE) {
+            return Short(this->dataIn->readShort()).toString();
         }
-        if( type == PrimitiveValueNode::BYTE_TYPE ) {
-            return Byte( this->dataIn->readByte() ).toString();
+        if (type == PrimitiveValueNode::BYTE_TYPE) {
+            return Byte(this->dataIn->readByte()).toString();
         }
-        if( type == PrimitiveValueNode::FLOAT_TYPE ) {
-            return Float( this->dataIn->readFloat() ).toString();
+        if (type == PrimitiveValueNode::FLOAT_TYPE) {
+            return Float(this->dataIn->readFloat()).toString();
         }
-        if( type == PrimitiveValueNode::DOUBLE_TYPE ) {
-            return Double( this->dataIn->readDouble() ).toString();
+        if (type == PrimitiveValueNode::DOUBLE_TYPE) {
+            return Double(this->dataIn->readDouble()).toString();
         }
-        if( type == PrimitiveValueNode::BOOLEAN_TYPE ) {
-            return ( this->dataIn->readBoolean() ? Boolean::_TRUE : Boolean::_FALSE ).toString();
+        if (type == PrimitiveValueNode::BOOLEAN_TYPE) {
+            return (this->dataIn->readBoolean() ? Boolean::_TRUE : Boolean::_FALSE).toString();
         }
 
-        if( type == PrimitiveValueNode::CHAR_TYPE ) {
-            return Character( this->dataIn->readChar() ).toString();
+        if (type == PrimitiveValueNode::CHAR_TYPE) {
+            return Character(this->dataIn->readChar()).toString();
         } else {
             this->dataIn->reset();
-            throw MessageFormatException( " not a String type", NULL );
+            throw MessageFormatException(" not a String type", NULL);
         }
 
-    } catch( NumberFormatException& ex ) {
+    } catch (NumberFormatException& ex) {
 
         try {
             this->dataIn->reset();
-        } catch( IOException& ioe ) {
-            throw CMSExceptionSupport::create( ioe );
+        } catch (IOException& ioe) {
+            throw CMSExceptionSupport::create(ioe);
         }
 
-        throw CMSExceptionSupport::create( ex );
+        throw CMSExceptionSupport::create(ex);
 
-    } catch( EOFException& e ) {
-        throw CMSExceptionSupport::createMessageEOFException( e );
-    } catch( IOException& e ) {
-        throw CMSExceptionSupport::createMessageFormatException( e );
-    } catch( Exception& e ) {
-        throw CMSExceptionSupport::create( e );
+    } catch (EOFException& e) {
+        throw CMSExceptionSupport::createMessageEOFException(e);
+    } catch (IOException& e) {
+        throw CMSExceptionSupport::createMessageFormatException(e);
+    } catch (Exception& e) {
+        throw CMSExceptionSupport::create(e);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeString( const std::string& value ) {
+void ActiveMQStreamMessage::writeString(const std::string& value) {
 
     initializeWriting();
-    try{
-        MarshallingSupport::writeString( *this->dataOut, value );
+    try {
+        MarshallingSupport::writeString(*this->dataOut, value);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+cms::Message::ValueType ActiveMQStreamMessage::getNextValueType() const {
+
+    initializeReading();
+    try {
+
+        if (this->impl->remainingBytes != -1) {
+            throw cms::IllegalStateException(
+                "Cannot read the next type during an byte array read operation, complete the read first.");
+        }
+
+        this->dataIn->mark(10);
+        int type = this->dataIn->read();
+
+        if (type == -1) {
+            throw MessageEOFException("reached end of data", NULL);
+        }
+
+        this->dataIn->reset();
+
+        switch(type) {
+            case util::PrimitiveValueNode::NULL_TYPE:
+                return cms::Message::NULL_TYPE;
+            case util::PrimitiveValueNode::BOOLEAN_TYPE:
+                return cms::Message::BOOLEAN_TYPE;
+            case util::PrimitiveValueNode::BYTE_TYPE:
+                return cms::Message::BYTE_TYPE;
+            case util::PrimitiveValueNode::CHAR_TYPE:
+                return cms::Message::CHAR_TYPE;
+            case util::PrimitiveValueNode::SHORT_TYPE:
+                return cms::Message::SHORT_TYPE;
+            case util::PrimitiveValueNode::INTEGER_TYPE:
+                return cms::Message::INTEGER_TYPE;
+            case util::PrimitiveValueNode::LONG_TYPE:
+                return cms::Message::LONG_TYPE;
+            case util::PrimitiveValueNode::DOUBLE_TYPE:
+                return cms::Message::DOUBLE_TYPE;
+            case util::PrimitiveValueNode::FLOAT_TYPE:
+                return cms::Message::FLOAT_TYPE;
+            case util::PrimitiveValueNode::STRING_TYPE:
+            case util::PrimitiveValueNode::BIG_STRING_TYPE:
+                return cms::Message::STRING_TYPE;
+            default:
+                throw MessageFormatException("Unknown type found in stream", NULL);
+        }
+
+        return cms::Message::UNKNOWN_TYPE;
+
+    } catch (EOFException& e) {
+        throw CMSExceptionSupport::createMessageEOFException(e);
+    } catch (IOException& e) {
+        throw CMSExceptionSupport::createMessageFormatException(e);
+    } catch (Exception& e) {
+        throw CMSExceptionSupport::create(e);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQStreamMessage::storeContent() {
 
-    if( this->dataOut.get() != NULL) {
+    if (this->dataOut.get() != NULL) {
 
         this->dataOut->close();
 
-        if( this->bytesOut->size() > 0 ) {
-            std::pair<unsigned char*, int> array = this->bytesOut->toByteArray();
-            this->setContent( std::vector<unsigned char>( array.first, array.first + array.second ) );
-            delete [] array.first;
+        if (this->impl->bytesOut->size() > 0) {
+            std::pair<unsigned char*, int> array = this->impl->bytesOut->toByteArray();
+            this->setContent(std::vector<unsigned char>(array.first, array.first + array.second));
+            delete[] array.first;
         }
 
-        this->dataOut.reset( NULL );
-        this->bytesOut = NULL;
+        this->dataOut.reset(NULL);
+        this->impl->bytesOut = NULL;
     }
 }
 
@@ -860,15 +948,15 @@ void ActiveMQStreamMessage::initializeRe
 
     this->failIfWriteOnlyBody();
     try {
-        if( this->dataIn.get() == NULL) {
-            InputStream* is = new ByteArrayInputStream( this->getContent() );
+        if (this->dataIn.get() == NULL) {
+            InputStream* is = new ByteArrayInputStream(this->getContent());
 
-            if( isCompressed() ) {
-                is = new InflaterInputStream( is, true );
-                is = new BufferedInputStream( is, true );
+            if (isCompressed()) {
+                is = new InflaterInputStream(is, true);
+                is = new BufferedInputStream(is, true);
             }
 
-            this->dataIn.reset( new DataInputStream( is, true ) );
+            this->dataIn.reset(new DataInputStream(is, true));
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -878,21 +966,21 @@ void ActiveMQStreamMessage::initializeRe
 void ActiveMQStreamMessage::initializeWriting() {
 
     this->failIfReadOnlyBody();
-    try{
-        if( this->dataOut.get() == NULL ) {
-            this->bytesOut = new ByteArrayOutputStream();
+    try {
+        if (this->dataOut.get() == NULL) {
+            this->impl->bytesOut = new ByteArrayOutputStream();
 
-            OutputStream* os = this->bytesOut;
+            OutputStream* os = this->impl->bytesOut;
 
-            if( this->connection != NULL && this->connection->isUseCompression() ) {
+            if (this->connection != NULL && this->connection->isUseCompression()) {
                 this->compressed = true;
 
-                Deflater* deflator = new Deflater( this->connection->getCompressionLevel() );
+                Deflater* deflator = new Deflater(this->connection->getCompressionLevel());
 
-                os = new DeflaterOutputStream( os, deflator, true, true );
+                os = new DeflaterOutputStream(os, deflator, true, true);
             }
 
-            this->dataOut.reset( new DataOutputStream( os, true ) );
+            this->dataOut.reset(new DataOutputStream(os, true));
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.h?rev=1348915&r1=1348914&r2=1348915&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.h Mon Jun 11 15:45:23 2012
@@ -34,108 +34,109 @@
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/io/DataInputStream.h>
 #include <decaf/io/DataOutputStream.h>
-#include <decaf/io/ByteArrayOutputStream.h>
 #include <string>
 #include <memory>
 
-namespace activemq{
-namespace commands{
+namespace activemq {
+namespace commands {
 
-    class AMQCPP_API ActiveMQStreamMessage :
-        public ActiveMQMessageTemplate< cms::StreamMessage > {
+    class ActiveMQStreamMessageImpl;
+
+    class AMQCPP_API ActiveMQStreamMessage: public ActiveMQMessageTemplate<cms::StreamMessage> {
     private:
 
-        decaf::io::ByteArrayOutputStream* bytesOut;
+        ActiveMQStreamMessageImpl* impl;
+
         mutable std::auto_ptr<decaf::io::DataInputStream> dataIn;
         mutable std::auto_ptr<decaf::io::DataOutputStream> dataOut;
 
-        mutable int remainingBytes;
-
     public:
 
         const static unsigned char ID_ACTIVEMQSTREAMMESSAGE = 27;
 
     private:
 
-        ActiveMQStreamMessage( const ActiveMQStreamMessage& );
-        ActiveMQStreamMessage& operator= ( const ActiveMQStreamMessage& );
+        ActiveMQStreamMessage(const ActiveMQStreamMessage&);
+        ActiveMQStreamMessage& operator=(const ActiveMQStreamMessage&);
 
     public:
 
         ActiveMQStreamMessage();
-        virtual ~ActiveMQStreamMessage() throw();
+        virtual ~ActiveMQStreamMessage() throw ();
 
         virtual unsigned char getDataStructureType() const;
 
         virtual ActiveMQStreamMessage* cloneDataStructure() const;
 
-        virtual void copyDataStructure( const DataStructure* src );
+        virtual void copyDataStructure(const DataStructure* src);
 
         virtual std::string toString() const;
 
-        virtual bool equals( const DataStructure* value ) const;
+        virtual bool equals(const DataStructure* value) const;
 
         virtual void onSend();
 
-    public:   // CMS Message
+    public: // CMS Message
 
         virtual cms::StreamMessage* clone() const {
-            return dynamic_cast<cms::StreamMessage*>( this->cloneDataStructure() );
+            return dynamic_cast<cms::StreamMessage*> (this->cloneDataStructure());
         }
 
         virtual void clearBody();
 
     public: // CMS Stream Message
 
+        virtual ValueType getNextValueType() const;
+
         virtual void reset();
 
         virtual bool readBoolean() const;
 
-        virtual void writeBoolean( bool value );
+        virtual void writeBoolean(bool value);
 
         virtual unsigned char readByte() const;
 
-        virtual void writeByte( unsigned char value );
+        virtual void writeByte(unsigned char value);
 
-        virtual int readBytes( std::vector<unsigned char>& value ) const;
+        virtual int readBytes(std::vector<unsigned char>& value) const;
 
-        virtual void writeBytes( const std::vector<unsigned char>& value );
+        virtual void writeBytes(const std::vector<unsigned char>& value);
 
-        virtual int readBytes( unsigned char* buffer, int length ) const;
+        virtual int readBytes(unsigned char* buffer, int length) const;
 
-        virtual void writeBytes( const unsigned char* value, int offset, int length );
+        virtual void writeBytes(const unsigned char* value, int offset, int length);
 
         virtual char readChar() const;
 
-        virtual void writeChar( char value );
+        virtual void writeChar(char value);
 
         virtual float readFloat() const;
 
-        virtual void writeFloat( float value );
+        virtual void writeFloat(float value);
 
         virtual double readDouble() const;
 
-        virtual void writeDouble( double value );
+        virtual void writeDouble(double value);
 
         virtual short readShort() const;
 
-        virtual void writeShort( short value );
+        virtual void writeShort(short value);
 
         virtual unsigned short readUnsignedShort() const;
 
-        virtual void writeUnsignedShort( unsigned short value );
+        virtual void writeUnsignedShort(unsigned short value);
 
         virtual int readInt() const;
 
-        virtual void writeInt( int value );
+        virtual void writeInt(int value);
 
         virtual long long readLong() const;
 
-        virtual void writeLong( long long value );
+        virtual void writeLong(long long value);
 
         virtual std::string readString() const;
 
-        virtual void writeString( const std::string& value );
+        virtual void writeString(const std::string& value);
 
     private:
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp?rev=1348915&r1=1348914&r2=1348915&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp Mon Jun 11 15:45:23 2012
@@ -186,15 +186,54 @@ bool ActiveMQMessageTransformation::tran
             ActiveMQStreamMessage* msg = new ActiveMQStreamMessage();
             msg->setConnection(connection);
 
-            // TODO Need element enumeration for StreamMessage
-//            try {
-//                while ((obj = streamMessage->readObject()) != NULL) {
-//                    msg->writeObject(obj);
-//                }
-//            } catch (MessageEOFException e) {
-//                // if an end of message stream as expected
-//            } catch (JMSException e) {
-//            }
+            try {
+                while(true) {
+                    cms::Message::ValueType elementType = streamMessage->getNextValueType();
+                    int result = -1;
+                    std::vector<unsigned char> buffer(255);
+
+                    switch(elementType) {
+                        case cms::Message::BOOLEAN_TYPE:
+                            msg->writeBoolean(streamMessage->readBoolean());
+                            break;
+                        case cms::Message::BYTE_TYPE:
+                            msg->writeBoolean(streamMessage->readBoolean());
+                            break;
+                        case cms::Message::BYTE_ARRAY_TYPE:
+                            while ((result = streamMessage->readBytes(buffer)) != -1) {
+                                msg->writeBytes(&buffer[0], 0, result);
+                                buffer.clear();
+                            }
+                            break;
+                        case cms::Message::CHAR_TYPE:
+                            msg->writeChar(streamMessage->readChar());
+                            break;
+                        case cms::Message::SHORT_TYPE:
+                            msg->writeShort(streamMessage->readShort());
+                            break;
+                        case cms::Message::INTEGER_TYPE:
+                            msg->writeInt(streamMessage->readInt());
+                            break;
+                        case cms::Message::LONG_TYPE:
+                            msg->writeLong(streamMessage->readLong());
+                            break;
+                        case cms::Message::FLOAT_TYPE:
+                            msg->writeFloat(streamMessage->readFloat());
+                            break;
+                        case cms::Message::DOUBLE_TYPE:
+                            msg->writeDouble(streamMessage->readDouble());
+                            break;
+                        case cms::Message::STRING_TYPE:
+                            msg->writeString(streamMessage->readString());
+                            break;
+                        default:
+                            break;
+                    }
+                }
+            } catch (cms::MessageEOFException& e) {
+                // if an end of message stream as expected
+            } catch (cms::CMSException& e) {
+            }
 
             *amqMessage = msg;
         } else if (dynamic_cast<cms::TextMessage*>(message) != NULL) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/StreamMessage.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/StreamMessage.h?rev=1348915&r1=1348914&r2=1348915&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/StreamMessage.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/StreamMessage.h Mon Jun 11 15:45:23 2012
@@ -64,6 +64,22 @@ namespace cms {
         virtual ~StreamMessage();
 
         /**
+         * Returns the value type for the element in the StreamMessage.  The CMS provider
+         * should translate all internal type identifiers into the CMS Value types returning
+         * UNKNOWN_TYPE for any specialized types not directly supported in the CMS API.
+         * The call can fail if the StreamMessage is currently in the middle of a ready of
+         * a Byte array.
+         *
+         * @returns The ValueType contained in the next message element.
+         *
+         * @throws CMSException if no property exists that matches the requested key.
+         * @throw MessageEOFException - if unexpected end of message stream has been reached.
+         * @throw MessageFormatException - if the message contains invalid data.
+         * @throw MessageNotReadableException - if the message is in write-only mode.
+         */
+        virtual ValueType getNextValueType() const = 0;
+
+        /**
          * Reads a Boolean from the Stream message stream
          * @returns boolean value from stream
          *

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQStreamMessageTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQStreamMessageTest.cpp?rev=1348915&r1=1348914&r2=1348915&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQStreamMessageTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQStreamMessageTest.cpp Mon Jun 11 15:45:23 2012
@@ -77,14 +77,23 @@ void ActiveMQStreamMessageTest::testSetA
 
     myMessage.reset();
 
+    CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::BOOLEAN_TYPE );
     CPPUNIT_ASSERT( myMessage.readBoolean() == false );
+    CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::BYTE_TYPE );
     CPPUNIT_ASSERT( myMessage.readByte() == 127 );
+    CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::CHAR_TYPE );
     CPPUNIT_ASSERT( myMessage.readChar() == 'a' );
+    CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::SHORT_TYPE );
     CPPUNIT_ASSERT( myMessage.readShort() == 32000 );
+    CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::INTEGER_TYPE );
     CPPUNIT_ASSERT( myMessage.readInt() == 6789999 );
+    CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::LONG_TYPE );
     CPPUNIT_ASSERT( myMessage.readLong() == 0xFFFAAA33345LL );
+    CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::FLOAT_TYPE );
     CPPUNIT_ASSERT( myMessage.readFloat() == 0.000012f );
+    CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::DOUBLE_TYPE );
     CPPUNIT_ASSERT( myMessage.readDouble() == 64.54654 );
+    CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::BYTE_ARRAY_TYPE );
     CPPUNIT_ASSERT( myMessage.readBytes( readData ) == (int)data.size() );
 }
 
@@ -898,6 +907,11 @@ void ActiveMQStreamMessageTest::testWrit
         CPPUNIT_FAIL("Should be writeable");
     }
     try {
+        message.getNextValueType();
+        CPPUNIT_FAIL("Should have thrown exception");
+    } catch( MessageNotReadableException& mnwe ) {
+    }
+    try {
         message.readBoolean();
         CPPUNIT_FAIL("Should have thrown exception");
     } catch( MessageNotReadableException& mnwe ) {