You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2012/06/11 17:45:24 UTC
svn commit: r1348915 - in /activemq/activemq-cpp/trunk/activemq-cpp/src:
main/activemq/commands/ main/activemq/util/ main/cms/ test/activemq/commands/
Author: tabish
Date: Mon Jun 11 15:45:23 2012
New Revision: 1348915
URL: http://svn.apache.org/viewvc?rev=1348915&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-411
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/StreamMessage.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQStreamMessageTest.cpp
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h?rev=1348915&r1=1348914&r2=1348915&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQMessageTemplate.h Mon Jun 11 15:45:23 2012
@@ -146,6 +146,8 @@ namespace commands {
case util::PrimitiveValueNode::STRING_TYPE:
case util::PrimitiveValueNode::BIG_STRING_TYPE:
return cms::Message::STRING_TYPE;
+ case util::PrimitiveValueNode::BYTE_ARRAY_TYPE:
+ return cms::Message::BYTE_ARRAY_TYPE;
default:
break;
}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp?rev=1348915&r1=1348914&r2=1348915&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.cpp Mon Jun 11 15:45:23 2012
@@ -36,6 +36,7 @@
#include <decaf/lang/Long.h>
#include <decaf/lang/Double.h>
#include <decaf/lang/Float.h>
+#include <decaf/io/ByteArrayOutputStream.h>
#include <decaf/io/ByteArrayInputStream.h>
#include <decaf/io/BufferedInputStream.h>
#include <decaf/util/zip/DeflaterOutputStream.h>
@@ -57,15 +58,46 @@ using namespace decaf::util;
using namespace decaf::util::zip;
////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace commands {
+
+ class ActiveMQStreamMessageImpl {
+ private:
+
+ ActiveMQStreamMessageImpl(const ActiveMQStreamMessageImpl&);
+ ActiveMQStreamMessageImpl& operator= (const ActiveMQStreamMessageImpl&);
+
+ public:
+
+ ActiveMQStreamMessageImpl() : bytesOut(NULL), remainingBytes(-1) {}
+ ~ActiveMQStreamMessageImpl() {}
+
+ public:
+
+ // Holds the contents of the message once written.
+ decaf::io::ByteArrayOutputStream* bytesOut;
+
+ // When reading an array of bytes this value indicates how many bytes
+ // are left unread since the last readBytes call.
+ mutable int remainingBytes;
+
+ };
+}}
+
+////////////////////////////////////////////////////////////////////////////////
ActiveMQStreamMessage::ActiveMQStreamMessage() :
- ActiveMQMessageTemplate< cms::StreamMessage >(), bytesOut(NULL), dataIn(), dataOut(), remainingBytes(-1) {
+ ActiveMQMessageTemplate<cms::StreamMessage>(), impl(new ActiveMQStreamMessageImpl()), dataIn(), dataOut() {
this->clearBody();
}
////////////////////////////////////////////////////////////////////////////////
-ActiveMQStreamMessage::~ActiveMQStreamMessage() throw() {
- this->reset();
+ActiveMQStreamMessage::~ActiveMQStreamMessage() throw () {
+ try {
+ this->reset();
+ delete impl;
+ }
+ AMQ_CATCHALL_NOTHROW()
}
////////////////////////////////////////////////////////////////////////////////
@@ -75,41 +107,39 @@ unsigned char ActiveMQStreamMessage::get
////////////////////////////////////////////////////////////////////////////////
ActiveMQStreamMessage* ActiveMQStreamMessage::cloneDataStructure() const {
- std::auto_ptr<ActiveMQStreamMessage> message( new ActiveMQStreamMessage() );
- message->copyDataStructure( this );
+ std::auto_ptr<ActiveMQStreamMessage> message(new ActiveMQStreamMessage());
+ message->copyDataStructure(this);
return message.release();
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::copyDataStructure( const DataStructure* src ) {
+void ActiveMQStreamMessage::copyDataStructure(const DataStructure* src) {
// Protect against invalid self assignment.
- if( this == src ) {
+ if (this == src) {
return;
}
- const ActiveMQStreamMessage* srcPtr = dynamic_cast<const ActiveMQStreamMessage*>( src );
+ const ActiveMQStreamMessage* srcPtr = dynamic_cast<const ActiveMQStreamMessage*> (src);
- if( srcPtr == NULL || src == NULL ) {
- throw decaf::lang::exceptions::NullPointerException(
- __FILE__, __LINE__,
- "ActiveMQStreamMessage::copyDataStructure - src is NULL or invalid" );
+ if (srcPtr == NULL || src == NULL) {
+ throw decaf::lang::exceptions::NullPointerException(__FILE__, __LINE__, "ActiveMQStreamMessage::copyDataStructure - src is NULL or invalid");
}
- ActiveMQStreamMessage* nonConstSrc = const_cast<ActiveMQStreamMessage*>( srcPtr );
+ ActiveMQStreamMessage* nonConstSrc = const_cast<ActiveMQStreamMessage*> (srcPtr);
nonConstSrc->storeContent();
- ActiveMQMessageTemplate<cms::StreamMessage>::copyDataStructure( src );
+ ActiveMQMessageTemplate<cms::StreamMessage>::copyDataStructure(src);
}
////////////////////////////////////////////////////////////////////////////////
-std::string ActiveMQStreamMessage::toString() const{
+std::string ActiveMQStreamMessage::toString() const {
return ActiveMQMessageTemplate<cms::StreamMessage>::toString();
}
////////////////////////////////////////////////////////////////////////////////
-bool ActiveMQStreamMessage::equals( const DataStructure* value ) const {
- return ActiveMQMessageTemplate<cms::StreamMessage>::equals( value );
+bool ActiveMQStreamMessage::equals(const DataStructure* value) const {
+ return ActiveMQMessageTemplate<cms::StreamMessage>::equals(value);
}
////////////////////////////////////////////////////////////////////////////////
@@ -118,10 +148,10 @@ void ActiveMQStreamMessage::clearBody()
// Invoke base class's version.
ActiveMQMessageTemplate<cms::StreamMessage>::clearBody();
- this->dataIn.reset( NULL );
- this->dataOut.reset( NULL );
- this->bytesOut = NULL;
- this->remainingBytes = -1;
+ this->dataIn.reset(NULL);
+ this->dataOut.reset(NULL);
+ this->impl->bytesOut = NULL;
+ this->impl->remainingBytes = -1;
}
////////////////////////////////////////////////////////////////////////////////
@@ -134,12 +164,12 @@ void ActiveMQStreamMessage::onSend() {
////////////////////////////////////////////////////////////////////////////////
void ActiveMQStreamMessage::reset() {
- try{
+ try {
storeContent();
- this->bytesOut = NULL;
+ this->impl->bytesOut = NULL;
this->dataIn.reset(NULL);
this->dataOut.reset(NULL);
- this->remainingBytes = -1;
+ this->impl->remainingBytes = -1;
this->setReadOnlyBody(true);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -148,7 +178,7 @@ void ActiveMQStreamMessage::reset() {
////////////////////////////////////////////////////////////////////////////////
bool ActiveMQStreamMessage::readBoolean() const {
- try{
+ try {
initializeReading();
@@ -156,40 +186,40 @@ bool ActiveMQStreamMessage::readBoolean(
int type = this->dataIn->read();
- if( type == -1 ) {
- throw MessageEOFException( "Reached the end of the Stream", NULL );
+ if (type == -1) {
+ throw MessageEOFException("Reached the end of the Stream", NULL);
}
- if( type == PrimitiveValueNode::BOOLEAN_TYPE ) {
+ if (type == PrimitiveValueNode::BOOLEAN_TYPE) {
return this->dataIn->readBoolean();
}
- if( type == PrimitiveValueNode::STRING_TYPE ) {
- return Boolean::valueOf( this->dataIn->readUTF() ).booleanValue();
+ if (type == PrimitiveValueNode::STRING_TYPE) {
+ return Boolean::valueOf(this->dataIn->readUTF()).booleanValue();
}
- if( type == PrimitiveValueNode::NULL_TYPE ) {
+ if (type == PrimitiveValueNode::NULL_TYPE) {
this->dataIn->reset();
- throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to boolean." );
+ throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to boolean.");
} else {
this->dataIn->reset();
- throw MessageFormatException( "not a boolean type", NULL );
+ throw MessageFormatException("not a boolean type", NULL);
}
- } catch( EOFException& e ) {
- throw CMSExceptionSupport::createMessageEOFException( e );
- } catch( IOException& e ) {
- throw CMSExceptionSupport::createMessageFormatException( e );
- } catch( Exception& e ) {
- throw CMSExceptionSupport::create( e );
+ } catch (EOFException& e) {
+ throw CMSExceptionSupport::createMessageEOFException(e);
+ } catch (IOException& e) {
+ throw CMSExceptionSupport::createMessageFormatException(e);
+ } catch (Exception& e) {
+ throw CMSExceptionSupport::create(e);
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeBoolean( bool value ) {
+void ActiveMQStreamMessage::writeBoolean(bool value) {
initializeWriting();
- try{
- this->dataOut->write( PrimitiveValueNode::BOOLEAN_TYPE );
- this->dataOut->writeBoolean( value );
+ try {
+ this->dataOut->write(PrimitiveValueNode::BOOLEAN_TYPE);
+ this->dataOut->writeBoolean(value);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -203,137 +233,137 @@ unsigned char ActiveMQStreamMessage::rea
this->dataIn->mark(10);
int type = this->dataIn->read();
- if( type == -1 ) {
- throw MessageEOFException( "reached end of data", NULL );
+ if (type == -1) {
+ throw MessageEOFException("reached end of data", NULL);
}
- if( type == PrimitiveValueNode::BYTE_TYPE ) {
+ if (type == PrimitiveValueNode::BYTE_TYPE) {
return this->dataIn->readByte();
}
- if( type == PrimitiveValueNode::STRING_TYPE ) {
- return Byte::valueOf( this->dataIn->readUTF() ).byteValue();
+ if (type == PrimitiveValueNode::STRING_TYPE) {
+ return Byte::valueOf(this->dataIn->readUTF()).byteValue();
}
- if( type == PrimitiveValueNode::NULL_TYPE ) {
+ if (type == PrimitiveValueNode::NULL_TYPE) {
this->dataIn->reset();
- throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to byte." );
+ throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to byte.");
} else {
this->dataIn->reset();
- throw MessageFormatException( " not a byte type", NULL );
+ throw MessageFormatException(" not a byte type", NULL);
}
- } catch( NumberFormatException& ex ) {
+ } catch (NumberFormatException& ex) {
try {
this->dataIn->reset();
- } catch( IOException& e ) {
+ } catch (IOException& e) {
throw CMSExceptionSupport::create(e);
}
- throw CMSExceptionSupport::createMessageFormatException( ex );
+ throw CMSExceptionSupport::createMessageFormatException(ex);
- } catch( EOFException& e ) {
- throw CMSExceptionSupport::createMessageEOFException( e );
- } catch( IOException& e ) {
- throw CMSExceptionSupport::createMessageFormatException( e );
- } catch( Exception& e ) {
- throw CMSExceptionSupport::create( e );
+ } catch (EOFException& e) {
+ throw CMSExceptionSupport::createMessageEOFException(e);
+ } catch (IOException& e) {
+ throw CMSExceptionSupport::createMessageFormatException(e);
+ } catch (Exception& e) {
+ throw CMSExceptionSupport::create(e);
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeByte( unsigned char value ) {
+void ActiveMQStreamMessage::writeByte(unsigned char value) {
initializeWriting();
- try{
- this->dataOut->write( PrimitiveValueNode::BYTE_TYPE );
- this->dataOut->writeByte( value );
+ try {
+ this->dataOut->write(PrimitiveValueNode::BYTE_TYPE);
+ this->dataOut->writeByte(value);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
-int ActiveMQStreamMessage::readBytes( std::vector<unsigned char>& value ) const {
+int ActiveMQStreamMessage::readBytes(std::vector<unsigned char>& value) const {
- if( value.size() == 0 ) {
+ if (value.size() == 0) {
return 0;
}
- return this->readBytes( &value[0], (int)value.size() );
+ return this->readBytes(&value[0], (int) value.size());
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeBytes( const std::vector<unsigned char>& value ) {
+void ActiveMQStreamMessage::writeBytes(const std::vector<unsigned char>& value) {
initializeWriting();
- try{
+ try {
- int size = (int)value.size();
- this->dataOut->write( PrimitiveValueNode::BYTE_ARRAY_TYPE );
- this->dataOut->writeInt( (int)size );
- this->dataOut->write( &value[0], size, 0, size );
+ int size = (int) value.size();
+ this->dataOut->write(PrimitiveValueNode::BYTE_ARRAY_TYPE);
+ this->dataOut->writeInt((int) size);
+ this->dataOut->write(&value[0], size, 0, size);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
-int ActiveMQStreamMessage::readBytes( unsigned char* buffer, int length ) const {
+int ActiveMQStreamMessage::readBytes(unsigned char* buffer, int length) const {
initializeReading();
try {
- if( buffer == NULL ) {
- throw NullPointerException( __FILE__, __LINE__, "Passed buffer was NULL" );
+ if (buffer == NULL) {
+ throw NullPointerException(__FILE__, __LINE__, "Passed buffer was NULL");
}
- if( this->remainingBytes == -1 ) {
+ if (this->impl->remainingBytes == -1) {
- this->dataIn->mark( (int)length + 1 );
+ this->dataIn->mark((int) length + 1);
int type = this->dataIn->read();
- if( type == -1 ) {
- throw MessageEOFException( "reached end of data", NULL );
+ if (type == -1) {
+ throw MessageEOFException("reached end of data", NULL);
}
- if( type != PrimitiveValueNode::BYTE_ARRAY_TYPE ) {
- throw MessageFormatException( "Not a byte array", NULL );
+ if (type != PrimitiveValueNode::BYTE_ARRAY_TYPE) {
+ throw MessageFormatException("Not a byte array", NULL);
}
- this->remainingBytes = this->dataIn->readInt();
+ this->impl->remainingBytes = this->dataIn->readInt();
- } else if( this->remainingBytes == 0 ) {
- remainingBytes = -1;
+ } else if (this->impl->remainingBytes == 0) {
+ this->impl->remainingBytes = -1;
return -1;
}
- if( length <= this->remainingBytes ) {
+ if (length <= this->impl->remainingBytes) {
// small buffer
- this->remainingBytes -= (int)length;
- this->dataIn->readFully( buffer, length );
+ this->impl->remainingBytes -= (int) length;
+ this->dataIn->readFully(buffer, length);
return length;
} else {
// big buffer
- int rc = this->dataIn->read( buffer, length, 0, this->remainingBytes );
- this->remainingBytes = 0;
+ int rc = this->dataIn->read(buffer, length, 0, this->impl->remainingBytes);
+ this->impl->remainingBytes = 0;
return rc;
}
- } catch( EOFException& e ) {
- throw CMSExceptionSupport::createMessageEOFException( e );
- } catch( IOException& e ) {
- throw CMSExceptionSupport::createMessageFormatException( e );
- } catch( Exception& e ) {
- throw CMSExceptionSupport::create( e );
+ } catch (EOFException& e) {
+ throw CMSExceptionSupport::createMessageEOFException(e);
+ } catch (IOException& e) {
+ throw CMSExceptionSupport::createMessageFormatException(e);
+ } catch (Exception& e) {
+ throw CMSExceptionSupport::create(e);
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeBytes( const unsigned char* value, int offset, int length ) {
+void ActiveMQStreamMessage::writeBytes(const unsigned char* value, int offset, int length) {
initializeWriting();
- try{
- this->dataOut->write( PrimitiveValueNode::BYTE_ARRAY_TYPE );
- this->dataOut->writeInt( (int)length );
- this->dataOut->write( value, length, offset, length );
+ try {
+ this->dataOut->write(PrimitiveValueNode::BYTE_ARRAY_TYPE);
+ this->dataOut->writeInt((int) length);
+ this->dataOut->write(value, length, offset, length);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -344,50 +374,51 @@ char ActiveMQStreamMessage::readChar() c
initializeReading();
try {
- this->dataIn->mark( 17 );
+ this->dataIn->mark(17);
int type = this->dataIn->read();
- if( type == -1 ) {
- throw MessageEOFException( "reached end of data", NULL );
+ if (type == -1) {
+ throw MessageEOFException("reached end of data", NULL);
}
- if( type == PrimitiveValueNode::CHAR_TYPE ) {
+ if (type == PrimitiveValueNode::CHAR_TYPE) {
return this->dataIn->readChar();
}
- if( type == PrimitiveValueNode::NULL_TYPE ) {
+ if (type == PrimitiveValueNode::NULL_TYPE) {
this->dataIn->reset();
- throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to char." );
+ throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to char.");
} else {
this->dataIn->reset();
- throw MessageFormatException( " not a char type", NULL );
+ throw MessageFormatException(" not a char type", NULL);
}
- } catch( NumberFormatException& ex ) {
+ } catch (NumberFormatException& ex) {
try {
this->dataIn->reset();
- } catch( IOException& ioe ) {
- throw CMSExceptionSupport::create( ioe );
+ } catch (IOException& ioe) {
+ throw CMSExceptionSupport::create(ioe);
}
- throw CMSExceptionSupport::create( ex );;
+ throw CMSExceptionSupport::create(ex);
+ ;
- } catch( EOFException& e ) {
- throw CMSExceptionSupport::createMessageEOFException( e );
- } catch( IOException& e ) {
- throw CMSExceptionSupport::createMessageFormatException( e );
- } catch( Exception& e ) {
- throw CMSExceptionSupport::create( e );
+ } catch (EOFException& e) {
+ throw CMSExceptionSupport::createMessageEOFException(e);
+ } catch (IOException& e) {
+ throw CMSExceptionSupport::createMessageFormatException(e);
+ } catch (Exception& e) {
+ throw CMSExceptionSupport::create(e);
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeChar( char value ) {
+void ActiveMQStreamMessage::writeChar(char value) {
initializeWriting();
- try{
- this->dataOut->write( PrimitiveValueNode::CHAR_TYPE );
- this->dataOut->writeChar( value );
+ try {
+ this->dataOut->write(PrimitiveValueNode::CHAR_TYPE);
+ this->dataOut->writeChar(value);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -398,53 +429,53 @@ float ActiveMQStreamMessage::readFloat()
initializeReading();
try {
- this->dataIn->mark( 33 );
+ this->dataIn->mark(33);
int type = this->dataIn->read();
- if( type == -1 ) {
- throw MessageEOFException( "reached end of data", NULL );
+ if (type == -1) {
+ throw MessageEOFException("reached end of data", NULL);
}
- if( type == PrimitiveValueNode::FLOAT_TYPE ) {
+ if (type == PrimitiveValueNode::FLOAT_TYPE) {
return this->dataIn->readFloat();
}
- if( type == PrimitiveValueNode::STRING_TYPE ) {
- return Float::valueOf( this->dataIn->readUTF() ).floatValue();
+ if (type == PrimitiveValueNode::STRING_TYPE) {
+ return Float::valueOf(this->dataIn->readUTF()).floatValue();
}
- if( type == PrimitiveValueNode::NULL_TYPE ) {
+ if (type == PrimitiveValueNode::NULL_TYPE) {
this->dataIn->reset();
- throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to float." );
+ throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to float.");
} else {
this->dataIn->reset();
- throw MessageFormatException( " not a float type", NULL );
+ throw MessageFormatException(" not a float type", NULL);
}
- } catch( NumberFormatException& ex ) {
+ } catch (NumberFormatException& ex) {
try {
this->dataIn->reset();
- } catch( IOException& ioe ) {
- throw CMSExceptionSupport::create( ioe );
+ } catch (IOException& ioe) {
+ throw CMSExceptionSupport::create(ioe);
}
- throw CMSExceptionSupport::create( ex );
+ throw CMSExceptionSupport::create(ex);
- } catch( EOFException& e ) {
- throw CMSExceptionSupport::createMessageEOFException( e );
- } catch( IOException& e ) {
- throw CMSExceptionSupport::createMessageFormatException( e );
- } catch( Exception& e ) {
- throw CMSExceptionSupport::create( e );
+ } catch (EOFException& e) {
+ throw CMSExceptionSupport::createMessageEOFException(e);
+ } catch (IOException& e) {
+ throw CMSExceptionSupport::createMessageFormatException(e);
+ } catch (Exception& e) {
+ throw CMSExceptionSupport::create(e);
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeFloat( float value ) {
+void ActiveMQStreamMessage::writeFloat(float value) {
initializeWriting();
- try{
- this->dataOut->write( PrimitiveValueNode::FLOAT_TYPE );
- this->dataOut->writeFloat( value );
+ try {
+ this->dataOut->write(PrimitiveValueNode::FLOAT_TYPE);
+ this->dataOut->writeFloat(value);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -455,56 +486,56 @@ double ActiveMQStreamMessage::readDouble
initializeReading();
try {
- this->dataIn->mark( 33 );
+ this->dataIn->mark(33);
int type = this->dataIn->read();
- if( type == -1 ) {
- throw MessageEOFException( "reached end of data", NULL );
+ if (type == -1) {
+ throw MessageEOFException("reached end of data", NULL);
}
- if( type == PrimitiveValueNode::DOUBLE_TYPE ) {
+ if (type == PrimitiveValueNode::DOUBLE_TYPE) {
return this->dataIn->readDouble();
}
- if( type == PrimitiveValueNode::FLOAT_TYPE ) {
+ if (type == PrimitiveValueNode::FLOAT_TYPE) {
return this->dataIn->readFloat();
}
- if( type == PrimitiveValueNode::STRING_TYPE ) {
- return Double::valueOf( this->dataIn->readUTF() ).doubleValue();
+ if (type == PrimitiveValueNode::STRING_TYPE) {
+ return Double::valueOf(this->dataIn->readUTF()).doubleValue();
}
- if( type == PrimitiveValueNode::NULL_TYPE ) {
+ if (type == PrimitiveValueNode::NULL_TYPE) {
this->dataIn->reset();
- throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to double." );
+ throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to double.");
} else {
this->dataIn->reset();
- throw MessageFormatException( " not a double type", NULL );
+ throw MessageFormatException(" not a double type", NULL);
}
- } catch( NumberFormatException& ex ) {
+ } catch (NumberFormatException& ex) {
try {
this->dataIn->reset();
- } catch( IOException& ioe ) {
- throw CMSExceptionSupport::create( ioe );
+ } catch (IOException& ioe) {
+ throw CMSExceptionSupport::create(ioe);
}
- throw CMSExceptionSupport::create( ex );
+ throw CMSExceptionSupport::create(ex);
- } catch( EOFException& e ) {
- throw CMSExceptionSupport::createMessageEOFException( e );
- } catch( IOException& e ) {
- throw CMSExceptionSupport::createMessageFormatException( e );
- } catch( Exception& e ) {
- throw CMSExceptionSupport::create( e );
+ } catch (EOFException& e) {
+ throw CMSExceptionSupport::createMessageEOFException(e);
+ } catch (IOException& e) {
+ throw CMSExceptionSupport::createMessageFormatException(e);
+ } catch (Exception& e) {
+ throw CMSExceptionSupport::create(e);
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeDouble( double value ) {
+void ActiveMQStreamMessage::writeDouble(double value) {
initializeWriting();
- try{
- this->dataOut->write( PrimitiveValueNode::DOUBLE_TYPE );
- this->dataOut->writeDouble( value );
+ try {
+ this->dataOut->write(PrimitiveValueNode::DOUBLE_TYPE);
+ this->dataOut->writeDouble(value);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -518,53 +549,53 @@ short ActiveMQStreamMessage::readShort()
this->dataIn->mark(17);
int type = this->dataIn->read();
- if( type == -1 ) {
- throw MessageEOFException( "reached end of data", NULL );
+ if (type == -1) {
+ throw MessageEOFException("reached end of data", NULL);
}
- if( type == PrimitiveValueNode::SHORT_TYPE ) {
+ if (type == PrimitiveValueNode::SHORT_TYPE) {
return this->dataIn->readShort();
}
- if( type == PrimitiveValueNode::BYTE_TYPE ) {
+ if (type == PrimitiveValueNode::BYTE_TYPE) {
return this->dataIn->readByte();
}
- if( type == PrimitiveValueNode::STRING_TYPE ) {
- return Short::valueOf( this->dataIn->readUTF() ).shortValue();
+ if (type == PrimitiveValueNode::STRING_TYPE) {
+ return Short::valueOf(this->dataIn->readUTF()).shortValue();
}
- if( type == PrimitiveValueNode::NULL_TYPE ) {
+ if (type == PrimitiveValueNode::NULL_TYPE) {
this->dataIn->reset();
- throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to short." );
+ throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to short.");
} else {
this->dataIn->reset();
- throw MessageFormatException( " not a short type", NULL );
+ throw MessageFormatException(" not a short type", NULL);
}
- } catch( NumberFormatException& ex ) {
+ } catch (NumberFormatException& ex) {
try {
this->dataIn->reset();
- } catch( IOException& e ) {
+ } catch (IOException& e) {
throw CMSExceptionSupport::create(e);
}
- throw CMSExceptionSupport::create( ex );
+ throw CMSExceptionSupport::create(ex);
- } catch( EOFException& e ) {
- throw CMSExceptionSupport::createMessageEOFException( e );
- } catch( IOException& e ) {
- throw CMSExceptionSupport::createMessageFormatException( e );
- } catch( Exception& e ) {
- throw CMSExceptionSupport::create( e );
+ } catch (EOFException& e) {
+ throw CMSExceptionSupport::createMessageEOFException(e);
+ } catch (IOException& e) {
+ throw CMSExceptionSupport::createMessageFormatException(e);
+ } catch (Exception& e) {
+ throw CMSExceptionSupport::create(e);
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeShort( short value ) {
+void ActiveMQStreamMessage::writeShort(short value) {
initializeWriting();
- try{
- this->dataOut->write( PrimitiveValueNode::SHORT_TYPE );
- this->dataOut->writeShort( value );
+ try {
+ this->dataOut->write(PrimitiveValueNode::SHORT_TYPE);
+ this->dataOut->writeShort(value);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -578,53 +609,53 @@ unsigned short ActiveMQStreamMessage::re
this->dataIn->mark(17);
int type = this->dataIn->read();
- if( type == -1 ) {
- throw MessageEOFException( "reached end of data", NULL );
+ if (type == -1) {
+ throw MessageEOFException("reached end of data", NULL);
}
- if( type == PrimitiveValueNode::SHORT_TYPE ) {
+ if (type == PrimitiveValueNode::SHORT_TYPE) {
return this->dataIn->readUnsignedShort();
}
- if( type == PrimitiveValueNode::BYTE_TYPE ) {
+ if (type == PrimitiveValueNode::BYTE_TYPE) {
return this->dataIn->readByte();
}
- if( type == PrimitiveValueNode::STRING_TYPE ) {
- return Short::valueOf( this->dataIn->readUTF() ).shortValue();
+ if (type == PrimitiveValueNode::STRING_TYPE) {
+ return Short::valueOf(this->dataIn->readUTF()).shortValue();
}
- if( type == PrimitiveValueNode::NULL_TYPE ) {
+ if (type == PrimitiveValueNode::NULL_TYPE) {
this->dataIn->reset();
- throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to short." );
+ throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to short.");
} else {
this->dataIn->reset();
- throw MessageFormatException( " not a short type", NULL );
+ throw MessageFormatException(" not a short type", NULL);
}
- } catch( NumberFormatException& ex ) {
+ } catch (NumberFormatException& ex) {
try {
this->dataIn->reset();
- } catch( IOException& e ) {
+ } catch (IOException& e) {
throw CMSExceptionSupport::create(e);
}
- throw CMSExceptionSupport::create( ex );
+ throw CMSExceptionSupport::create(ex);
- } catch( EOFException& e ) {
- throw CMSExceptionSupport::createMessageEOFException( e );
- } catch( IOException& e ) {
- throw CMSExceptionSupport::createMessageFormatException( e );
- } catch( Exception& e ) {
- throw CMSExceptionSupport::create( e );
+ } catch (EOFException& e) {
+ throw CMSExceptionSupport::createMessageEOFException(e);
+ } catch (IOException& e) {
+ throw CMSExceptionSupport::createMessageFormatException(e);
+ } catch (Exception& e) {
+ throw CMSExceptionSupport::create(e);
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeUnsignedShort( unsigned short value ) {
+void ActiveMQStreamMessage::writeUnsignedShort(unsigned short value) {
initializeWriting();
- try{
- this->dataOut->write( PrimitiveValueNode::SHORT_TYPE );
- this->dataOut->writeUnsignedShort( value );
+ try {
+ this->dataOut->write(PrimitiveValueNode::SHORT_TYPE);
+ this->dataOut->writeUnsignedShort(value);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -638,56 +669,56 @@ int ActiveMQStreamMessage::readInt() con
this->dataIn->mark(33);
int type = this->dataIn->read();
- if( type == -1 ) {
- throw MessageEOFException( "reached end of data", NULL );
+ if (type == -1) {
+ throw MessageEOFException("reached end of data", NULL);
}
- if( type == PrimitiveValueNode::INTEGER_TYPE ) {
+ if (type == PrimitiveValueNode::INTEGER_TYPE) {
return this->dataIn->readInt();
}
- if( type == PrimitiveValueNode::SHORT_TYPE ) {
+ if (type == PrimitiveValueNode::SHORT_TYPE) {
return this->dataIn->readShort();
}
- if( type == PrimitiveValueNode::BYTE_TYPE ) {
+ if (type == PrimitiveValueNode::BYTE_TYPE) {
return this->dataIn->readByte();
}
- if( type == PrimitiveValueNode::STRING_TYPE ) {
- return Integer::valueOf( this->dataIn->readUTF() ).intValue();
+ if (type == PrimitiveValueNode::STRING_TYPE) {
+ return Integer::valueOf(this->dataIn->readUTF()).intValue();
}
- if( type == PrimitiveValueNode::NULL_TYPE ) {
+ if (type == PrimitiveValueNode::NULL_TYPE) {
this->dataIn->reset();
- throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to int." );
+ throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to int.");
} else {
this->dataIn->reset();
- throw MessageFormatException( " not a int type", NULL );
+ throw MessageFormatException(" not a int type", NULL);
}
- } catch( NumberFormatException& ex ) {
+ } catch (NumberFormatException& ex) {
try {
this->dataIn->reset();
- } catch( IOException& e ) {
+ } catch (IOException& e) {
throw CMSExceptionSupport::create(e);
}
- throw CMSExceptionSupport::create( ex );
+ throw CMSExceptionSupport::create(ex);
- } catch( EOFException& e ) {
- throw CMSExceptionSupport::createMessageEOFException( e );
- } catch( IOException& e ) {
- throw CMSExceptionSupport::createMessageFormatException( e );
- } catch( Exception& e ) {
- throw CMSExceptionSupport::create( e );
+ } catch (EOFException& e) {
+ throw CMSExceptionSupport::createMessageEOFException(e);
+ } catch (IOException& e) {
+ throw CMSExceptionSupport::createMessageFormatException(e);
+ } catch (Exception& e) {
+ throw CMSExceptionSupport::create(e);
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeInt( int value ) {
+void ActiveMQStreamMessage::writeInt(int value) {
initializeWriting();
- try{
- this->dataOut->write( PrimitiveValueNode::INTEGER_TYPE );
- this->dataOut->writeInt( value );
+ try {
+ this->dataOut->write(PrimitiveValueNode::INTEGER_TYPE);
+ this->dataOut->writeInt(value);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -701,59 +732,59 @@ long long ActiveMQStreamMessage::readLon
this->dataIn->mark(65);
int type = this->dataIn->read();
- if( type == -1 ) {
- throw MessageEOFException( "reached end of data", NULL );
+ if (type == -1) {
+ throw MessageEOFException("reached end of data", NULL);
}
- if( type == PrimitiveValueNode::LONG_TYPE ) {
+ if (type == PrimitiveValueNode::LONG_TYPE) {
return this->dataIn->readLong();
}
- if( type == PrimitiveValueNode::INTEGER_TYPE ) {
+ if (type == PrimitiveValueNode::INTEGER_TYPE) {
return this->dataIn->readInt();
}
- if( type == PrimitiveValueNode::SHORT_TYPE ) {
+ if (type == PrimitiveValueNode::SHORT_TYPE) {
return this->dataIn->readShort();
}
- if( type == PrimitiveValueNode::BYTE_TYPE ) {
+ if (type == PrimitiveValueNode::BYTE_TYPE) {
return this->dataIn->readByte();
}
- if( type == PrimitiveValueNode::STRING_TYPE ) {
- return Long::valueOf( this->dataIn->readUTF() ).longValue();
+ if (type == PrimitiveValueNode::STRING_TYPE) {
+ return Long::valueOf(this->dataIn->readUTF()).longValue();
}
- if( type == PrimitiveValueNode::NULL_TYPE ) {
+ if (type == PrimitiveValueNode::NULL_TYPE) {
this->dataIn->reset();
- throw NullPointerException( __FILE__, __LINE__, "Cannot convert NULL value to long." );
+ throw NullPointerException(__FILE__, __LINE__, "Cannot convert NULL value to long.");
} else {
this->dataIn->reset();
- throw MessageFormatException( " not a long type", NULL );
+ throw MessageFormatException(" not a long type", NULL);
}
- } catch( NumberFormatException& ex ) {
+ } catch (NumberFormatException& ex) {
try {
this->dataIn->reset();
- } catch( IOException& e ) {
+ } catch (IOException& e) {
throw CMSExceptionSupport::create(e);
}
- throw CMSExceptionSupport::create( ex );
+ throw CMSExceptionSupport::create(ex);
- } catch( EOFException& e ) {
- throw CMSExceptionSupport::createMessageEOFException( e );
- } catch( IOException& e ) {
- throw CMSExceptionSupport::createMessageFormatException( e );
- } catch( Exception& e ) {
- throw CMSExceptionSupport::create( e );
+ } catch (EOFException& e) {
+ throw CMSExceptionSupport::createMessageEOFException(e);
+ } catch (IOException& e) {
+ throw CMSExceptionSupport::createMessageFormatException(e);
+ } catch (Exception& e) {
+ throw CMSExceptionSupport::create(e);
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeLong( long long value ) {
+void ActiveMQStreamMessage::writeLong(long long value) {
initializeWriting();
- try{
- this->dataOut->write( PrimitiveValueNode::LONG_TYPE );
- this->dataOut->writeLong( value );
+ try {
+ this->dataOut->write(PrimitiveValueNode::LONG_TYPE);
+ this->dataOut->writeLong(value);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
@@ -764,94 +795,151 @@ std::string ActiveMQStreamMessage::readS
initializeReading();
try {
- this->dataIn->mark( 65 );
+ this->dataIn->mark(65);
int type = this->dataIn->read();
- if( type == -1 ) {
- throw MessageEOFException( "reached end of data", NULL );
+ if (type == -1) {
+ throw MessageEOFException("reached end of data", NULL);
}
- if( type == PrimitiveValueNode::NULL_TYPE ) {
+ if (type == PrimitiveValueNode::NULL_TYPE) {
return "";
}
- if( type == PrimitiveValueNode::BIG_STRING_TYPE ) {
- return MarshallingSupport::readString32( *this->dataIn );
+ if (type == PrimitiveValueNode::BIG_STRING_TYPE) {
+ return MarshallingSupport::readString32(*this->dataIn);
}
- if( type == PrimitiveValueNode::STRING_TYPE ) {
- return MarshallingSupport::readString16( *this->dataIn );
+ if (type == PrimitiveValueNode::STRING_TYPE) {
+ return MarshallingSupport::readString16(*this->dataIn);
}
- if( type == PrimitiveValueNode::LONG_TYPE ) {
- return Long( this->dataIn->readLong() ).toString();
+ if (type == PrimitiveValueNode::LONG_TYPE) {
+ return Long(this->dataIn->readLong()).toString();
}
- if( type == PrimitiveValueNode::INTEGER_TYPE ) {
- return Integer( this->dataIn->readInt() ).toString();
+ if (type == PrimitiveValueNode::INTEGER_TYPE) {
+ return Integer(this->dataIn->readInt()).toString();
}
- if( type == PrimitiveValueNode::SHORT_TYPE ) {
- return Short( this->dataIn->readShort() ).toString();
+ if (type == PrimitiveValueNode::SHORT_TYPE) {
+ return Short(this->dataIn->readShort()).toString();
}
- if( type == PrimitiveValueNode::BYTE_TYPE ) {
- return Byte( this->dataIn->readByte() ).toString();
+ if (type == PrimitiveValueNode::BYTE_TYPE) {
+ return Byte(this->dataIn->readByte()).toString();
}
- if( type == PrimitiveValueNode::FLOAT_TYPE ) {
- return Float( this->dataIn->readFloat() ).toString();
+ if (type == PrimitiveValueNode::FLOAT_TYPE) {
+ return Float(this->dataIn->readFloat()).toString();
}
- if( type == PrimitiveValueNode::DOUBLE_TYPE ) {
- return Double( this->dataIn->readDouble() ).toString();
+ if (type == PrimitiveValueNode::DOUBLE_TYPE) {
+ return Double(this->dataIn->readDouble()).toString();
}
- if( type == PrimitiveValueNode::BOOLEAN_TYPE ) {
- return ( this->dataIn->readBoolean() ? Boolean::_TRUE : Boolean::_FALSE ).toString();
+ if (type == PrimitiveValueNode::BOOLEAN_TYPE) {
+ return (this->dataIn->readBoolean() ? Boolean::_TRUE : Boolean::_FALSE).toString();
}
- if( type == PrimitiveValueNode::CHAR_TYPE ) {
- return Character( this->dataIn->readChar() ).toString();
+ if (type == PrimitiveValueNode::CHAR_TYPE) {
+ return Character(this->dataIn->readChar()).toString();
} else {
this->dataIn->reset();
- throw MessageFormatException( " not a String type", NULL );
+ throw MessageFormatException(" not a String type", NULL);
}
- } catch( NumberFormatException& ex ) {
+ } catch (NumberFormatException& ex) {
try {
this->dataIn->reset();
- } catch( IOException& ioe ) {
- throw CMSExceptionSupport::create( ioe );
+ } catch (IOException& ioe) {
+ throw CMSExceptionSupport::create(ioe);
}
- throw CMSExceptionSupport::create( ex );
+ throw CMSExceptionSupport::create(ex);
- } catch( EOFException& e ) {
- throw CMSExceptionSupport::createMessageEOFException( e );
- } catch( IOException& e ) {
- throw CMSExceptionSupport::createMessageFormatException( e );
- } catch( Exception& e ) {
- throw CMSExceptionSupport::create( e );
+ } catch (EOFException& e) {
+ throw CMSExceptionSupport::createMessageEOFException(e);
+ } catch (IOException& e) {
+ throw CMSExceptionSupport::createMessageFormatException(e);
+ } catch (Exception& e) {
+ throw CMSExceptionSupport::create(e);
}
}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQStreamMessage::writeString( const std::string& value ) {
+void ActiveMQStreamMessage::writeString(const std::string& value) {
initializeWriting();
- try{
- MarshallingSupport::writeString( *this->dataOut, value );
+ try {
+ MarshallingSupport::writeString(*this->dataOut, value);
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
}
////////////////////////////////////////////////////////////////////////////////
+cms::Message::ValueType ActiveMQStreamMessage::getNextValueType() const {
+
+ initializeReading();
+ try {
+
+ if (this->impl->remainingBytes != -1) {
+ throw cms::IllegalStateException(
+ "Cannot read the next type during an byte array read operation, complete the read first.");
+ }
+
+ this->dataIn->mark(10);
+ int type = this->dataIn->read();
+
+ if (type == -1) {
+ throw MessageEOFException("reached end of data", NULL);
+ }
+
+ this->dataIn->reset();
+
+ switch(type) {
+ case util::PrimitiveValueNode::NULL_TYPE:
+ return cms::Message::NULL_TYPE;
+ case util::PrimitiveValueNode::BOOLEAN_TYPE:
+ return cms::Message::BOOLEAN_TYPE;
+ case util::PrimitiveValueNode::BYTE_TYPE:
+ return cms::Message::BYTE_TYPE;
+ case util::PrimitiveValueNode::CHAR_TYPE:
+ return cms::Message::CHAR_TYPE;
+ case util::PrimitiveValueNode::SHORT_TYPE:
+ return cms::Message::SHORT_TYPE;
+ case util::PrimitiveValueNode::INTEGER_TYPE:
+ return cms::Message::INTEGER_TYPE;
+ case util::PrimitiveValueNode::LONG_TYPE:
+ return cms::Message::LONG_TYPE;
+ case util::PrimitiveValueNode::DOUBLE_TYPE:
+ return cms::Message::DOUBLE_TYPE;
+ case util::PrimitiveValueNode::FLOAT_TYPE:
+ return cms::Message::FLOAT_TYPE;
+ case util::PrimitiveValueNode::STRING_TYPE:
+ case util::PrimitiveValueNode::BIG_STRING_TYPE:
+ return cms::Message::STRING_TYPE;
+ default:
+ throw MessageFormatException("Unknown type found in stream", NULL);
+ }
+
+ return cms::Message::UNKNOWN_TYPE;
+
+ } catch (EOFException& e) {
+ throw CMSExceptionSupport::createMessageEOFException(e);
+ } catch (IOException& e) {
+ throw CMSExceptionSupport::createMessageFormatException(e);
+ } catch (Exception& e) {
+ throw CMSExceptionSupport::create(e);
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
void ActiveMQStreamMessage::storeContent() {
- if( this->dataOut.get() != NULL) {
+ if (this->dataOut.get() != NULL) {
this->dataOut->close();
- if( this->bytesOut->size() > 0 ) {
- std::pair<unsigned char*, int> array = this->bytesOut->toByteArray();
- this->setContent( std::vector<unsigned char>( array.first, array.first + array.second ) );
- delete [] array.first;
+ if (this->impl->bytesOut->size() > 0) {
+ std::pair<unsigned char*, int> array = this->impl->bytesOut->toByteArray();
+ this->setContent(std::vector<unsigned char>(array.first, array.first + array.second));
+ delete[] array.first;
}
- this->dataOut.reset( NULL );
- this->bytesOut = NULL;
+ this->dataOut.reset(NULL);
+ this->impl->bytesOut = NULL;
}
}
@@ -860,15 +948,15 @@ void ActiveMQStreamMessage::initializeRe
this->failIfWriteOnlyBody();
try {
- if( this->dataIn.get() == NULL) {
- InputStream* is = new ByteArrayInputStream( this->getContent() );
+ if (this->dataIn.get() == NULL) {
+ InputStream* is = new ByteArrayInputStream(this->getContent());
- if( isCompressed() ) {
- is = new InflaterInputStream( is, true );
- is = new BufferedInputStream( is, true );
+ if (isCompressed()) {
+ is = new InflaterInputStream(is, true);
+ is = new BufferedInputStream(is, true);
}
- this->dataIn.reset( new DataInputStream( is, true ) );
+ this->dataIn.reset(new DataInputStream(is, true));
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -878,21 +966,21 @@ void ActiveMQStreamMessage::initializeRe
void ActiveMQStreamMessage::initializeWriting() {
this->failIfReadOnlyBody();
- try{
- if( this->dataOut.get() == NULL ) {
- this->bytesOut = new ByteArrayOutputStream();
+ try {
+ if (this->dataOut.get() == NULL) {
+ this->impl->bytesOut = new ByteArrayOutputStream();
- OutputStream* os = this->bytesOut;
+ OutputStream* os = this->impl->bytesOut;
- if( this->connection != NULL && this->connection->isUseCompression() ) {
+ if (this->connection != NULL && this->connection->isUseCompression()) {
this->compressed = true;
- Deflater* deflator = new Deflater( this->connection->getCompressionLevel() );
+ Deflater* deflator = new Deflater(this->connection->getCompressionLevel());
- os = new DeflaterOutputStream( os, deflator, true, true );
+ os = new DeflaterOutputStream(os, deflator, true, true);
}
- this->dataOut.reset( new DataOutputStream( os, true ) );
+ this->dataOut.reset(new DataOutputStream(os, true));
}
}
AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.h?rev=1348915&r1=1348914&r2=1348915&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/commands/ActiveMQStreamMessage.h Mon Jun 11 15:45:23 2012
@@ -34,108 +34,109 @@
#include <decaf/lang/exceptions/NullPointerException.h>
#include <decaf/io/DataInputStream.h>
#include <decaf/io/DataOutputStream.h>
-#include <decaf/io/ByteArrayOutputStream.h>
#include <string>
#include <memory>
-namespace activemq{
-namespace commands{
+namespace activemq {
+namespace commands {
- class AMQCPP_API ActiveMQStreamMessage :
- public ActiveMQMessageTemplate< cms::StreamMessage > {
+ class ActiveMQStreamMessageImpl;
+
+ class AMQCPP_API ActiveMQStreamMessage: public ActiveMQMessageTemplate<cms::StreamMessage> {
private:
- decaf::io::ByteArrayOutputStream* bytesOut;
+ ActiveMQStreamMessageImpl* impl;
+
mutable std::auto_ptr<decaf::io::DataInputStream> dataIn;
mutable std::auto_ptr<decaf::io::DataOutputStream> dataOut;
- mutable int remainingBytes;
-
public:
const static unsigned char ID_ACTIVEMQSTREAMMESSAGE = 27;
private:
- ActiveMQStreamMessage( const ActiveMQStreamMessage& );
- ActiveMQStreamMessage& operator= ( const ActiveMQStreamMessage& );
+ ActiveMQStreamMessage(const ActiveMQStreamMessage&);
+ ActiveMQStreamMessage& operator=(const ActiveMQStreamMessage&);
public:
ActiveMQStreamMessage();
- virtual ~ActiveMQStreamMessage() throw();
+ virtual ~ActiveMQStreamMessage() throw ();
virtual unsigned char getDataStructureType() const;
virtual ActiveMQStreamMessage* cloneDataStructure() const;
- virtual void copyDataStructure( const DataStructure* src );
+ virtual void copyDataStructure(const DataStructure* src);
virtual std::string toString() const;
- virtual bool equals( const DataStructure* value ) const;
+ virtual bool equals(const DataStructure* value) const;
virtual void onSend();
- public: // CMS Message
+ public: // CMS Message
virtual cms::StreamMessage* clone() const {
- return dynamic_cast<cms::StreamMessage*>( this->cloneDataStructure() );
+ return dynamic_cast<cms::StreamMessage*> (this->cloneDataStructure());
}
virtual void clearBody();
public: // CMS Stream Message
+ virtual ValueType getNextValueType() const;
+
virtual void reset();
virtual bool readBoolean() const;
- virtual void writeBoolean( bool value );
+ virtual void writeBoolean(bool value);
virtual unsigned char readByte() const;
- virtual void writeByte( unsigned char value );
+ virtual void writeByte(unsigned char value);
- virtual int readBytes( std::vector<unsigned char>& value ) const;
+ virtual int readBytes(std::vector<unsigned char>& value) const;
- virtual void writeBytes( const std::vector<unsigned char>& value );
+ virtual void writeBytes(const std::vector<unsigned char>& value);
- virtual int readBytes( unsigned char* buffer, int length ) const;
+ virtual int readBytes(unsigned char* buffer, int length) const;
- virtual void writeBytes( const unsigned char* value, int offset, int length );
+ virtual void writeBytes(const unsigned char* value, int offset, int length);
virtual char readChar() const;
- virtual void writeChar( char value );
+ virtual void writeChar(char value);
virtual float readFloat() const;
- virtual void writeFloat( float value );
+ virtual void writeFloat(float value);
virtual double readDouble() const;
- virtual void writeDouble( double value );
+ virtual void writeDouble(double value);
virtual short readShort() const;
- virtual void writeShort( short value );
+ virtual void writeShort(short value);
virtual unsigned short readUnsignedShort() const;
- virtual void writeUnsignedShort( unsigned short value );
+ virtual void writeUnsignedShort(unsigned short value);
virtual int readInt() const;
- virtual void writeInt( int value );
+ virtual void writeInt(int value);
virtual long long readLong() const;
- virtual void writeLong( long long value );
+ virtual void writeLong(long long value);
virtual std::string readString() const;
- virtual void writeString( const std::string& value );
+ virtual void writeString(const std::string& value);
private:
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp?rev=1348915&r1=1348914&r2=1348915&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp Mon Jun 11 15:45:23 2012
@@ -186,15 +186,54 @@ bool ActiveMQMessageTransformation::tran
ActiveMQStreamMessage* msg = new ActiveMQStreamMessage();
msg->setConnection(connection);
- // TODO Need element enumeration for StreamMessage
-// try {
-// while ((obj = streamMessage->readObject()) != NULL) {
-// msg->writeObject(obj);
-// }
-// } catch (MessageEOFException e) {
-// // if an end of message stream as expected
-// } catch (JMSException e) {
-// }
+ try {
+ while(true) {
+ cms::Message::ValueType elementType = streamMessage->getNextValueType();
+ int result = -1;
+ std::vector<unsigned char> buffer(255);
+
+ switch(elementType) {
+ case cms::Message::BOOLEAN_TYPE:
+ msg->writeBoolean(streamMessage->readBoolean());
+ break;
+ case cms::Message::BYTE_TYPE:
+ msg->writeBoolean(streamMessage->readBoolean());
+ break;
+ case cms::Message::BYTE_ARRAY_TYPE:
+ while ((result = streamMessage->readBytes(buffer)) != -1) {
+ msg->writeBytes(&buffer[0], 0, result);
+ buffer.clear();
+ }
+ break;
+ case cms::Message::CHAR_TYPE:
+ msg->writeChar(streamMessage->readChar());
+ break;
+ case cms::Message::SHORT_TYPE:
+ msg->writeShort(streamMessage->readShort());
+ break;
+ case cms::Message::INTEGER_TYPE:
+ msg->writeInt(streamMessage->readInt());
+ break;
+ case cms::Message::LONG_TYPE:
+ msg->writeLong(streamMessage->readLong());
+ break;
+ case cms::Message::FLOAT_TYPE:
+ msg->writeFloat(streamMessage->readFloat());
+ break;
+ case cms::Message::DOUBLE_TYPE:
+ msg->writeDouble(streamMessage->readDouble());
+ break;
+ case cms::Message::STRING_TYPE:
+ msg->writeString(streamMessage->readString());
+ break;
+ default:
+ break;
+ }
+ }
+ } catch (cms::MessageEOFException& e) {
+ // if an end of message stream as expected
+ } catch (cms::CMSException& e) {
+ }
*amqMessage = msg;
} else if (dynamic_cast<cms::TextMessage*>(message) != NULL) {
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/StreamMessage.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/StreamMessage.h?rev=1348915&r1=1348914&r2=1348915&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/StreamMessage.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/StreamMessage.h Mon Jun 11 15:45:23 2012
@@ -64,6 +64,22 @@ namespace cms {
virtual ~StreamMessage();
/**
+ * Returns the value type for the element in the StreamMessage. The CMS provider
+ * should translate all internal type identifiers into the CMS Value types returning
+ * UNKNOWN_TYPE for any specialized types not directly supported in the CMS API.
+ * The call can fail if the StreamMessage is currently in the middle of a ready of
+ * a Byte array.
+ *
+ * @returns The ValueType contained in the next message element.
+ *
+ * @throws CMSException if no property exists that matches the requested key.
+ * @throw MessageEOFException - if unexpected end of message stream has been reached.
+ * @throw MessageFormatException - if the message contains invalid data.
+ * @throw MessageNotReadableException - if the message is in write-only mode.
+ */
+ virtual ValueType getNextValueType() const = 0;
+
+ /**
* Reads a Boolean from the Stream message stream
* @returns boolean value from stream
*
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQStreamMessageTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQStreamMessageTest.cpp?rev=1348915&r1=1348914&r2=1348915&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQStreamMessageTest.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/commands/ActiveMQStreamMessageTest.cpp Mon Jun 11 15:45:23 2012
@@ -77,14 +77,23 @@ void ActiveMQStreamMessageTest::testSetA
myMessage.reset();
+ CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::BOOLEAN_TYPE );
CPPUNIT_ASSERT( myMessage.readBoolean() == false );
+ CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::BYTE_TYPE );
CPPUNIT_ASSERT( myMessage.readByte() == 127 );
+ CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::CHAR_TYPE );
CPPUNIT_ASSERT( myMessage.readChar() == 'a' );
+ CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::SHORT_TYPE );
CPPUNIT_ASSERT( myMessage.readShort() == 32000 );
+ CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::INTEGER_TYPE );
CPPUNIT_ASSERT( myMessage.readInt() == 6789999 );
+ CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::LONG_TYPE );
CPPUNIT_ASSERT( myMessage.readLong() == 0xFFFAAA33345LL );
+ CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::FLOAT_TYPE );
CPPUNIT_ASSERT( myMessage.readFloat() == 0.000012f );
+ CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::DOUBLE_TYPE );
CPPUNIT_ASSERT( myMessage.readDouble() == 64.54654 );
+ CPPUNIT_ASSERT( myMessage.getNextValueType() == cms::Message::BYTE_ARRAY_TYPE );
CPPUNIT_ASSERT( myMessage.readBytes( readData ) == (int)data.size() );
}
@@ -898,6 +907,11 @@ void ActiveMQStreamMessageTest::testWrit
CPPUNIT_FAIL("Should be writeable");
}
try {
+ message.getNextValueType();
+ CPPUNIT_FAIL("Should have thrown exception");
+ } catch( MessageNotReadableException& mnwe ) {
+ }
+ try {
message.readBoolean();
CPPUNIT_FAIL("Should have thrown exception");
} catch( MessageNotReadableException& mnwe ) {