You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by js...@apache.org on 2006/03/01 15:28:20 UTC
svn commit: r382028 [3/4] - in /incubator/activemq/trunk/cms: ./
activemqcms/ activemqcms/src/ activemqcms/src/activemq/
activemqcms/src/activemq/concurrent/ activemqcms/src/activemq/io/
activemqcms/src/activemq/transport/ activemqcms/src/activemq/tran...
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectProtocolAdapter.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_DISCONNECTPROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_DISCONNECTPROTOCOLADAPTER_H_
+
+#include <activemq/transport/stomp/ProtocolAdapter.h>
+#include <activemq/transport/stomp/DisconnectMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * Adapts between disconnect messages and stomp frames.
+ * @author Nathan Mittler
+ */
+ class DisconnectProtocolAdapter : public ProtocolAdapter
+ {
+ public:
+ virtual ~DisconnectProtocolAdapter(){};
+
+ virtual StompMessage* adapt( const StompFrame* frame ){
+
+ DisconnectMessage* msg = new DisconnectMessage();
+ return msg;
+ }
+ virtual StompFrame* adapt( const StompMessage* message ){
+ StompFrame* frame = new StompFrame();
+
+ frame->setCommand( getCommandId( message->getMessageType() ) );
+
+ return frame;
+ }
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_DISCONNECTPROTOCOLADAPTER_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectProtocolAdapter.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectProtocolAdapter.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorMessage.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_ERRORMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_ERRORMESSAGE_H_
+
+#include <activemq/transport/stomp/StompMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * Message sent from the broker when an error
+ * occurs.
+ * @author Nathan Mittler
+ */
+ class ErrorMessage : public StompMessage
+ {
+ public:
+ virtual ~ErrorMessage(){};
+
+ virtual MessageType getMessageType() const{
+ return MSG_ERROR;
+ }
+ virtual const cms::Message* getCMSMessage() const{
+ return NULL;
+ }
+ virtual cms::Message* getCMSMessage(){
+ return NULL;
+ }
+
+ virtual void setErrorTitle( const char* title ){
+ errorTitle = title;
+ }
+
+ virtual const std::string& getErrorTitle() const{
+ return errorTitle;
+ }
+
+ virtual void setErrorText( const char* text ){
+ errorText = text;
+ }
+
+ virtual const std::string& getErrorText() const{
+ return errorText;
+ }
+
+ virtual cms::Message* clone() const{
+ ErrorMessage* msg = new ErrorMessage();
+ msg->errorTitle = errorTitle;
+ msg->errorText = errorText;
+ return msg->getCMSMessage();
+ }
+
+ private:
+
+ std::string errorTitle;
+ std::string errorText;
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_ERRORMESSAGE_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorMessage.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorMessage.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorProtocolAdapter.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_ERRORPROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_ERRORPROTOCOLADAPTER_H_
+
+#include <activemq/transport/stomp/ProtocolAdapter.h>
+#include <activemq/transport/stomp/ErrorMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * Adapts between error messages and stomp frames.
+ * @author Nathan Mittler
+ */
+ class ErrorProtocolAdapter : public ProtocolAdapter{
+ public:
+
+ virtual ~ErrorProtocolAdapter(){}
+
+ virtual StompMessage* adapt( const StompFrame* frame ){
+ const StompFrame::HeaderInfo* title = frame->getHeaderInfo( StompFrame::HEADER_MESSAGE );
+
+ ErrorMessage* msg = new ErrorMessage();
+ msg->setErrorTitle( title->value );
+ msg->setErrorText( frame->getBody() );
+
+ return msg;
+ }
+
+ virtual StompFrame* adapt( const StompMessage* message ){
+ StompFrame* frame = new StompFrame();
+
+ const ErrorMessage* msg = dynamic_cast<const ErrorMessage*>(message);
+
+ // Set command.
+ frame->setCommand( getCommandId( msg->getMessageType() ) );
+
+ // Set the "message" header
+ frame->setHeader( StompFrame::HEADER_MESSAGE,
+ msg->getErrorTitle().c_str(),
+ msg->getErrorTitle().size() );
+
+ // Set the error text.
+ frame->setBodyText( msg->getErrorText().c_str(),
+ msg->getErrorText().size() );
+
+ return frame;
+ }
+ };
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_ERRORPROTOCOLADAPTER_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorProtocolAdapter.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorProtocolAdapter.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.cpp Wed Mar 1 06:27:46 2006
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ProtocolAdapter.h"
+
+using namespace activemq::transport::stomp;
+
+////////////////////////////////////////////////////////////////////////////////
+StompFrame::Command ProtocolAdapter::getCommandId( const StompMessage::MessageType type ){
+
+ switch( type ){
+ case StompMessage::MSG_CONNECT: return StompFrame::COMMAND_CONNECT;
+ case StompMessage::MSG_CONNECTED: return StompFrame::COMMAND_CONNECTED;
+ case StompMessage::MSG_DISCONNECT: return StompFrame::COMMAND_DISCONNECT;
+ case StompMessage::MSG_SUBSCRIBE: return StompFrame::COMMAND_SUBSCRIBE;
+ case StompMessage::MSG_UNSUBSCRIBE: return StompFrame::COMMAND_UNSUBSCRIBE;
+ case StompMessage::MSG_TEXT: return StompFrame::COMMAND_SEND;
+ case StompMessage::MSG_BYTES: return StompFrame::COMMAND_SEND;
+ case StompMessage::MSG_BEGIN: return StompFrame::COMMAND_BEGIN;
+ case StompMessage::MSG_COMMIT: return StompFrame::COMMAND_COMMIT;
+ case StompMessage::MSG_ABORT: return StompFrame::COMMAND_ABORT;
+ case StompMessage::MSG_ACK: return StompFrame::COMMAND_ACK;
+ case StompMessage::MSG_ERROR: return StompFrame::COMMAND_ERROR;
+ default: return StompFrame::NUM_COMMANDS;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompMessage::MessageType ProtocolAdapter::getMessageType( const StompFrame* frame ){
+
+ switch( frame->getCommand() ){
+ case StompFrame::COMMAND_CONNECT: return StompMessage::MSG_CONNECT;
+ case StompFrame::COMMAND_CONNECTED: return StompMessage::MSG_CONNECTED;
+ case StompFrame::COMMAND_DISCONNECT: return StompMessage::MSG_DISCONNECT;
+ case StompFrame::COMMAND_SUBSCRIBE: return StompMessage::MSG_SUBSCRIBE;
+ case StompFrame::COMMAND_UNSUBSCRIBE: return StompMessage::MSG_UNSUBSCRIBE;
+ case StompFrame::COMMAND_MESSAGE:{
+ const StompFrame::HeaderInfo* info = frame->getHeaderInfo( StompFrame::HEADER_CONTENTLENGTH );
+ if( info == NULL ){
+ return StompMessage::MSG_TEXT;
+ }
+ return StompMessage::MSG_BYTES;
+ }
+ case StompFrame::COMMAND_BEGIN: return StompMessage::MSG_BEGIN;
+ case StompFrame::COMMAND_COMMIT: return StompMessage::MSG_COMMIT;
+ case StompFrame::COMMAND_ABORT: return StompMessage::MSG_ABORT;
+ case StompFrame::COMMAND_ACK: return StompMessage::MSG_ACK;
+ case StompFrame::COMMAND_ERROR: return StompMessage::MSG_ERROR;
+ default: return StompMessage::NUM_MSG_TYPES;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session::AcknowledgeMode ProtocolAdapter::getAckMode( const StompFrame* frame ){
+
+ const StompFrame::HeaderInfo* mode = frame->getHeaderInfo( StompFrame::HEADER_ACK );
+ if( mode != NULL ){
+
+ if( StompFrame::toAckMode( mode->value ) == StompFrame::ACK_CLIENT ){
+ return cms::Session::CLIENT_ACKNOWLEDGE;
+ }
+ }
+
+ return cms::Session::AUTO_ACKNOWLEDGE;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const char* ProtocolAdapter::getAckModeString( const cms::Session::AcknowledgeMode mode ){
+
+ if( mode == cms::Session::CLIENT_ACKNOWLEDGE ){
+ return StompFrame::toString( StompFrame::ACK_CLIENT );
+ }
+
+ return StompFrame::toString( StompFrame::ACK_AUTO );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ProtocolAdapter::getAckModeStringLength( const cms::Session::AcknowledgeMode mode ){
+
+ if( mode == cms::Session::CLIENT_ACKNOWLEDGE ){
+ return StompFrame::getAckModeLength( StompFrame::ACK_CLIENT );
+ }
+
+ return StompFrame::getAckModeLength( StompFrame::ACK_AUTO );
+}
+
+
+
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.cpp
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,56 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_PROTOCOLADAPTER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_PROTOCOLADAPTER_H_
+
+#include <activemq/transport/stomp/StompMessage.h>
+#include <activemq/transport/stomp/StompFrame.h>
+#include <cms/Session.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ // Forward declarations.
+ class StompMessage;
+
+ /**
+ * Interface for all adapters between messages and
+ * stomp frames.
+ * @author Nathan Mittler
+ */
+ class ProtocolAdapter
+ {
+ public:
+ virtual ~ProtocolAdapter(){};
+
+ virtual StompMessage* adapt( const StompFrame* frame ) = 0;
+ virtual StompFrame* adapt( const StompMessage* message ) = 0;
+
+ public:
+
+ static StompFrame::Command getCommandId( const StompMessage::MessageType type );
+ static StompMessage::MessageType getMessageType( const StompFrame* frame );
+ static cms::Session::AcknowledgeMode getAckMode( const StompFrame* frame );
+ static const char* getAckModeString( const cms::Session::AcknowledgeMode mode );
+ static int getAckModeStringLength( const cms::Session::AcknowledgeMode mode );
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_PROTOCOLADAPTER_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompBytesMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompBytesMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompBytesMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompBytesMessage.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,155 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPBYTESMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPBYTESMESSAGE_H_
+
+#include <activemq/transport/stomp/DestinationMessage.h>
+#include <activemq/transport/stomp/TransactionMessage.h>
+#include <cms/BytesMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * A binary data message.
+ * @author Nathan Mittler
+ */
+ class StompBytesMessage
+ :
+ public DestinationMessage,
+ public TransactionMessage,
+ public cms::BytesMessage
+ {
+ public:
+
+ StompBytesMessage(){
+ transactionId = NULL;
+ data = NULL;
+ numBytes = 0;
+ own = false;
+ }
+
+ virtual ~StompBytesMessage(){
+ if( transactionId != NULL ){
+ delete transactionId;
+ }
+
+ clearData();
+ };
+
+ virtual MessageType getMessageType() const{
+ return MSG_BYTES;
+ }
+ virtual const cms::Message* getCMSMessage() const{
+ return this;
+ }
+ virtual cms::Message* getCMSMessage(){
+ return this;
+ }
+
+ virtual void setDestination( const char* destination ){
+ this->destination = destination;
+ }
+
+ virtual const char* getDestination() const{
+ return destination.c_str();
+ }
+
+ virtual bool isTransaction() const{
+ return transactionId != NULL;
+ }
+
+ virtual const char* getTransactionId() const{
+ if( isTransaction() ){
+ return transactionId->c_str();
+ }
+ return NULL;
+ }
+
+ virtual void setTransactionId( const char* id ){
+ if( transactionId != NULL ){
+ delete transactionId;
+ transactionId = NULL;
+ }
+
+ transactionId = new std::string( id );
+ }
+
+ virtual void setData( const char* data, const int numBytes ) throw(cms::CMSException){
+ clearData();
+
+ own = true;
+
+ char* buf = new char[numBytes];
+ memcpy( buf, data, numBytes );
+ this->data = buf;
+ this->numBytes = numBytes;
+ }
+
+ virtual void setDataNoCopy( const char* data, const int numBytes ) throw(cms::CMSException){
+ clearData();
+
+ own = false;
+
+ this->data = const_cast<char*>(data);
+ this->numBytes = numBytes;
+ }
+
+ virtual int getNumBytes() const{
+ return numBytes;
+ }
+
+ virtual const char* getData() const{
+ return data;
+ }
+
+ virtual void acknowledge() throw( cms::CMSException ){
+ }
+
+ virtual cms::Message* clone() const{
+ StompBytesMessage* msg = new StompBytesMessage();
+ msg->destination = destination;
+ if( transactionId != NULL ){
+ msg->setTransactionId( transactionId->c_str() );
+ }
+ msg->setData( data, numBytes );
+ return msg;
+ }
+
+ protected:
+
+ void clearData(){
+ if( data != NULL && own ){
+ delete [] data;
+ }
+ data = NULL;
+ }
+
+ private:
+
+ std::string destination;
+ std::string* transactionId;
+ int numBytes;
+ char* data;
+ bool own;
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPBYTESMESSAGE_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompBytesMessage.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompBytesMessage.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.cpp Wed Mar 1 06:27:46 2006
@@ -0,0 +1,138 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StompFrame.h"
+#include <stdio.h>
+
+using namespace activemq::transport::stomp;
+using namespace std;
+
+bool StompFrame::staticInitialized = false;
+const char* StompFrame::standardHeaders[NUM_STANDARD_HEADERS];
+int StompFrame::standardHeaderLengths[NUM_STANDARD_HEADERS];
+const char* StompFrame::commands[NUM_COMMANDS];
+int StompFrame::commandLengths[NUM_COMMANDS];
+const char* StompFrame::ackModes[NUM_ACK_MODES];
+int StompFrame::ackModeLengths[NUM_ACK_MODES];
+
+////////////////////////////////////////////////////////////////////////////////
+StompFrame::StompFrame(){
+
+ body = NULL;
+ bodyLength = 0;
+
+ if( !staticInitialized ){
+ staticInit();
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompFrame::~StompFrame(){
+}
+
+void StompFrame::staticInit(){
+
+ standardHeaders[HEADER_DESTINATION] = "destination";
+ standardHeaders[HEADER_TRANSACTIONID] = "transaction";
+ standardHeaders[HEADER_CONTENTLENGTH] = "content-length";
+ standardHeaders[HEADER_SESSIONID] = "session";
+ standardHeaders[HEADER_RECEIPTID] ="receipt";
+ standardHeaders[HEADER_MESSAGEID] = "message-id";
+ standardHeaders[HEADER_ACK] = "ack";
+ standardHeaders[HEADER_LOGIN] = "login";
+ standardHeaders[HEADER_PASSWORD] = "passcode";
+ standardHeaders[HEADER_MESSAGE] = "message";
+ commands[COMMAND_CONNECT] = "CONNECT";
+ commands[COMMAND_CONNECTED] = "CONNECTED";
+ commands[COMMAND_DISCONNECT] = "DISCONNECT";
+ commands[COMMAND_SUBSCRIBE] = "SUBSCRIBE";
+ commands[COMMAND_UNSUBSCRIBE] = "UNSUBSCRIBE";
+ commands[COMMAND_MESSAGE] = "MESSAGE";
+ commands[COMMAND_SEND] = "SEND";
+ commands[COMMAND_BEGIN] = "BEGIN";
+ commands[COMMAND_COMMIT] = "COMMIT";
+ commands[COMMAND_ABORT] = "ABORT";
+ commands[COMMAND_ACK] = "ACK";
+ commands[COMMAND_ERROR] = "ERROR";
+ ackModes[ACK_CLIENT] = "client";
+ ackModes[ACK_AUTO] = "auto";
+
+ // Assign all the string lengths for the standard headers.
+ for( int ix=0; ix<NUM_STANDARD_HEADERS; ++ix ){
+ standardHeaderLengths[ix] = strlen(standardHeaders[ix]);
+ }
+
+ // Assign all the string lengths for the commands.
+ for( int ix=0; ix<NUM_COMMANDS; ++ix ){
+ commandLengths[ix] = strlen(commands[ix]);
+ }
+
+ // Assign all the string lengths for the ack modes.
+ for( int ix=0; ix<NUM_ACK_MODES; ++ix ){
+ ackModeLengths[ix] = strlen(ackModes[ix]);
+ }
+
+ staticInitialized = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompFrame::setHeader( const char* key, const int keyLength,
+ const char* value,
+ const int valueLength )
+{
+ HeaderInfo info;
+ info.key = key;
+ info.keyLength = keyLength;
+ info.value = value;
+ info.valueLength = valueLength;
+
+ headers[key] = info;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const StompFrame::HeaderInfo* StompFrame::getHeaderInfo( const char* name ) const{
+
+ map< string, HeaderInfo >::const_iterator pos = headers.find( name );
+ if( pos == headers.end() ){
+ return NULL;
+ }
+
+ return &(pos->second);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompFrame::setBodyText( const char* text, const int length ){
+
+ body = text;
+ bodyLength = length;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompFrame::setBodyBytes( const char* bytes, int numBytes ){
+
+ body = bytes;
+ bodyLength = numBytes;
+
+ // Set the content-length header.
+ sprintf( bodyLengthStr, "%d", numBytes );
+ setHeader( standardHeaders[HEADER_CONTENTLENGTH],
+ standardHeaderLengths[HEADER_CONTENTLENGTH],
+ bodyLengthStr,
+ strlen(bodyLengthStr) );
+}
+
+
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.cpp
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,274 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPFRAMEWRAPPER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPFRAMEWRAPPER_H_
+
+#include <vector>
+#include <string>
+#include <map>
+#include <list>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * A Stomp-level message frame that encloses all messages
+ * to and from the broker.
+ * @author Nathan Mittler
+ */
+ class StompFrame{
+
+ public: // StandardHeader enumeration
+
+ enum StandardHeader{
+ HEADER_DESTINATION,
+ HEADER_TRANSACTIONID,
+ HEADER_CONTENTLENGTH,
+ HEADER_SESSIONID,
+ HEADER_RECEIPTID,
+ HEADER_MESSAGEID,
+ HEADER_ACK,
+ HEADER_LOGIN,
+ HEADER_PASSWORD,
+ HEADER_MESSAGE,
+ NUM_STANDARD_HEADERS
+ };
+
+ static const char* toString( const StandardHeader header ){
+ return standardHeaders[header];
+ }
+
+ static int getStandardHeaderLength( const StandardHeader header ){
+ return standardHeaderLengths[header];
+ }
+
+ static StandardHeader toStandardHeader( const char* header ){
+ for( int ix=0; ix<NUM_STANDARD_HEADERS; ++ix ){
+ if( header == standardHeaders[ix] ||
+ strcmp(header, standardHeaders[ix]) == 0 ){
+ return (StandardHeader)ix;
+ }
+ }
+ return NUM_STANDARD_HEADERS;
+ }
+
+ public: // Command enumeration
+
+ enum Command{
+ COMMAND_CONNECT,
+ COMMAND_CONNECTED,
+ COMMAND_DISCONNECT,
+ COMMAND_SUBSCRIBE,
+ COMMAND_UNSUBSCRIBE,
+ COMMAND_MESSAGE,
+ COMMAND_SEND,
+ COMMAND_BEGIN,
+ COMMAND_COMMIT,
+ COMMAND_ABORT,
+ COMMAND_ACK,
+ COMMAND_ERROR,
+ NUM_COMMANDS
+ };
+
+ static const char* toString( const Command cmd ){
+ return commands[cmd];
+ }
+
+ static int getCommandLength( const Command cmd ){
+ return commandLengths[cmd];
+ }
+
+ static Command toCommand( const char* cmd ){
+ for( int ix=0; ix<NUM_COMMANDS; ++ix ){
+ if( cmd == commands[ix] ||
+ strcmp(cmd, commands[ix]) == 0 ){
+ return (Command)ix;
+ }
+ }
+ return NUM_COMMANDS;
+ }
+
+ public: // AckMode enumeration
+
+ enum AckMode{
+ ACK_CLIENT,
+ ACK_AUTO,
+ NUM_ACK_MODES
+ };
+
+ static const char* toString( const AckMode mode ){
+ return ackModes[mode];
+ }
+
+ static int getAckModeLength( const AckMode mode ){
+ return ackModeLengths[mode];
+ }
+
+ static AckMode toAckMode( const char* mode ){
+ for( int ix=0; ix<NUM_ACK_MODES; ++ix ){
+ if( mode == ackModes[ix] ||
+ strcmp(mode, ackModes[ix]) == 0 ){
+ return (AckMode)ix;
+ }
+ }
+ return NUM_ACK_MODES;
+ }
+
+ public:
+
+ struct HeaderInfo{
+ const char* key;
+ int keyLength;
+ const char* value;
+ int valueLength;
+ };
+
+ public:
+
+ /**
+ * Default constructor.
+ */
+ StompFrame();
+
+ /**
+ * Destruction - frees the memory pool.
+ */
+ virtual ~StompFrame();
+
+ /**
+ * Sets the command for this stomp frame.
+ * @param command The command to be set.
+ */
+ virtual void setCommand( Command cmd ){
+ command = cmd;
+ }
+
+ /**
+ * Accessor for this frame's command field.
+ */
+ virtual Command getCommand() const{
+ return command;
+ }
+
+ virtual void setHeader( const StandardHeader key,
+ const char* value,
+ const int valueLength ){
+ setHeader( toString( key ), getStandardHeaderLength( key ),
+ value,
+ valueLength );
+ }
+
+ /**
+ * Sets the given header in this frame.
+ * @param key The name of the header to be set.
+ * @param keyLength The string length of the key.
+ * @param value The value to set for the header.
+ * @param valueLength The length of the value string.
+ */
+ virtual void setHeader( const char* key,
+ const int keyLength,
+ const char* value,
+ const int valueLength );
+
+ virtual const HeaderInfo* getHeaderInfo( const StandardHeader name ) const{
+ return getHeaderInfo( toString( name ) );
+ }
+
+ /**
+ * Accessor for the value of the given header information.
+ * @param name The name of the header to lookup.
+ * @return The information for the given header.
+ */
+ virtual const HeaderInfo* getHeaderInfo( const char* name ) const;
+
+ virtual const HeaderInfo* getFirstHeader(){
+ pos = headers.begin();
+ return getNextHeader();
+ }
+ virtual const HeaderInfo* getNextHeader(){
+ if( pos == headers.end() ){
+ return NULL;
+ }
+
+ const HeaderInfo* info = &(pos->second);
+
+ pos++;
+ return info;
+ }
+ virtual const int getNumHeaders() const{
+ return headers.size();
+ }
+
+ /**
+ * Accessor for the body data of this frame.
+ */
+ virtual const char* getBody() const{
+ return body;
+ }
+
+ virtual int getBodyLength() const{ return bodyLength; }
+
+ /**
+ * Sets the body data of this frame as a text string.
+ * @param text The data to set in the body.
+ * @param textLength The length of the text string.
+ */
+ virtual void setBodyText( const char* text, const int textLength );
+
+ /**
+ * Sets the body data of this frame as a byte sequence. Adds the
+ * content-length header to specify the number of bytes in the
+ * sequence.
+ * @param bytes The byte buffer to be set in the body.
+ * @param numBytes The number of bytes in the buffer.
+ */
+ virtual void setBodyBytes( const char* bytes, const int numBytes );
+
+ private:
+
+ typedef std::map< std::string, HeaderInfo> headersType;
+
+ private:
+
+ static void staticInit();
+
+ private:
+
+ Command command;
+
+ headersType headers;
+
+ const char* body;
+ int bodyLength;
+ char bodyLengthStr[20];
+
+ headersType::const_iterator pos;
+
+ static bool staticInitialized;
+ static const char* standardHeaders[NUM_STANDARD_HEADERS];
+ static int standardHeaderLengths[NUM_STANDARD_HEADERS];
+ static const char* commands[NUM_COMMANDS];
+ static int commandLengths[NUM_COMMANDS];
+ static const char* ackModes[NUM_ACK_MODES];
+ static int ackModeLengths[NUM_ACK_MODES];
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPFRAMEWRAPPER_H_ */
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp Wed Mar 1 06:27:46 2006
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StompIO.h"
+
+using namespace activemq::transport::stomp;
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+StompIO::StompIO( InputStream* istream, OutputStream* ostream )
+{
+ this->istream = istream;
+ this->ostream = ostream;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompIO::~StompIO()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int StompIO::readStompHeaderLine( char* buf, const int bufLen ) throw (ActiveMQException){
+
+ int pos = 0;
+
+ while( pos < bufLen ){
+
+ // Read the next char from the stream.
+ buf[pos] = istream->read();
+
+ // Increment the position pointer.
+ pos++;
+
+ // If we reached the line terminator, return the total number
+ // of characters read.
+ if( buf[pos-1] == '\n' ){
+
+ // Overwrite the line feed with a null character.
+ buf[pos-1] = '\0';
+ return pos;
+ }
+ }
+
+ // Reading is not complete.
+ return pos;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int StompIO::readStompBodyLine( char* buf, const int bufLen ) throw (ActiveMQException){
+
+ int pos = 0;
+
+ while( pos < bufLen ){
+
+ // Read the next char from the stream.
+ buf[pos] = istream->read();
+
+ // Increment the position pointer.
+ pos++;
+
+ // If we've reached the end of the body - return.
+ if( (buf[pos-1]=='\0' && pos==1) ||
+ (pos >= 2 && buf[pos-2]=='\0' && buf[pos-1] == '\n') ){
+ return pos;
+ }
+ }
+
+ // Reading is not complete.
+ return pos;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompIO::readStompCommand( StompFrame& frame ) throw (ActiveMQException){
+
+ // The command is the first element in the message - initialize
+ // the buffer position.
+ readBufferPos = 0;
+
+ // Read the command;
+ int numChars = readStompHeaderLine( readBuffer, readBufferSize );
+ if( readBufferPos + numChars >= readBufferSize ){
+ throw ActiveMQException( "readStompCommand: exceeded buffer size" );
+ }
+
+ // Set the command in the frame - do not copy the memory.
+ frame.setCommand( StompFrame::toCommand(readBuffer) );
+
+ // Increment the position in the buffer.
+ readBufferPos += numChars;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompIO::readStompHeaders( StompFrame& frame ) throw (ActiveMQException){
+
+ // Read the command;
+ bool endOfHeaders = false;
+ while( !endOfHeaders ){
+
+ // Read in the next header line.
+ int numChars = readStompHeaderLine(
+ readBuffer+readBufferPos,
+ readBufferSize-readBufferPos );
+ if( readBufferPos+numChars >= readBufferSize ){
+ throw ActiveMQException( "readStompHeaders: exceeded buffer size" ); // should never get here
+ }
+ if( numChars == 0 ){
+ throw ActiveMQException( "readStompHeaders: no characters read" ); // should never get here
+ }
+
+ // Check for an empty line to demark the end of the header section.
+ if( readBuffer[readBufferPos] == '\0' ){
+ endOfHeaders = true;
+ }
+
+ // Search through this line to separate the key/value pair.
+ for( int ix=readBufferPos; ix<readBufferPos+numChars; ++ix ){
+
+ // If found the key/value separator...
+ if( readBuffer[ix] == ':' ){
+
+ // Null-terminate the key.
+ readBuffer[ix] = '\0';
+
+ const char* key = readBuffer+readBufferPos;
+ int keyLen = ix-readBufferPos;
+ const char* value = readBuffer+ix+1;
+ int valLen = numChars - keyLen - 2;
+
+ // Assign the header key/value pair.
+ frame.setHeader( key,
+ keyLen,
+ value,
+ valLen );
+
+ // Break out of the for loop.
+ break;
+ }
+ }
+
+ // Point past this line in the buffer.
+ readBufferPos+=numChars;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompIO::readStompBody( StompFrame& frame ) throw (ActiveMQException){
+
+ // Read in the next header line.
+ int numChars = readStompBodyLine(
+ readBuffer+readBufferPos,
+ readBufferSize-readBufferPos );
+ if( readBufferPos+numChars >= readBufferSize ){
+ throw ActiveMQException( "readStompBody: exceeded buffer size" ); // should never get here
+ }
+ if( numChars == 0 ){
+ throw ActiveMQException( "readStompBody: no characters read" ); // should never get here
+ }
+
+ // Set the body contents in the frame.
+ frame.setBodyText( readBuffer+readBufferPos, numChars );
+
+ // Point past this line in the buffer.
+ readBufferPos+=numChars;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompFrame* StompIO::readStompFrame() throw (ActiveMQException){
+
+ // Read the command into the frame.
+ readStompCommand( frame );
+
+ // Read the headers.
+ readStompHeaders( frame );
+
+ // Read the body.
+ readStompBody( frame );
+
+ return &frame;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompIO::writeStompFrame( StompFrame& frame ) throw( ActiveMQException ){
+
+ // Write the command.
+ StompFrame::Command cmd = frame.getCommand();
+ write( StompFrame::toString( cmd ), StompFrame::getCommandLength( cmd ) );
+ write( '\n' );
+
+ // Write all the headers.
+ const StompFrame::HeaderInfo* headerInfo = frame.getFirstHeader();
+ for( ; headerInfo != NULL; headerInfo = frame.getNextHeader() ){
+
+ write( headerInfo->key, headerInfo->keyLength );
+ write( ':' );
+ write( headerInfo->value, headerInfo->valueLength );
+ write( '\n' );
+ }
+
+ // Finish the header section with a form feed.
+ write( '\n' );
+
+ // Write the body.
+ const char* body = frame.getBody();
+ if( body != NULL ) {
+ write( body, frame.getBodyLength() );
+ }
+ write( '\0' );
+
+ // Flush the stream.
+ flush();
+}
+
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPIO_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPIO_H_
+
+#include <activemq/transport/stomp/StompInputStream.h>
+#include <activemq/transport/stomp/StompOutputStream.h>
+#include <activemq/ActiveMQException.h>
+#include <activemq/transport/stomp/StompFrame.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * Input/Output stream for stomp frames.
+ * @author Nathan Mittler
+ */
+ class StompIO
+ :
+ public StompInputStream,
+ public StompOutputStream
+ {
+ public:
+
+ StompIO( io::InputStream* stream, io::OutputStream* ostream );
+ virtual ~StompIO();
+
+ virtual int available() const{ return istream->available(); }
+
+ virtual char read() throw (ActiveMQException){ return istream->read(); }
+
+ virtual int read( char* buffer, const int bufferSize ) throw (ActiveMQException){
+ return istream->read( buffer, bufferSize );
+ }
+
+ virtual StompFrame* readStompFrame() throw (ActiveMQException);
+
+ virtual void write( const char c ) throw (ActiveMQException){
+ ostream->write( c );
+ }
+
+ virtual void write( const char* buffer, const int len ) throw (ActiveMQException){
+ ostream->write( buffer, len );
+ }
+
+ virtual void flush() throw (ActiveMQException){ ostream->flush(); }
+
+ virtual void writeStompFrame( StompFrame& frame ) throw( ActiveMQException );
+
+ virtual void close() throw(cms::CMSException){
+ istream->close();
+ ostream->close();
+ }
+
+ private:
+
+ int readStompHeaderLine( char* buf, const int bufLen ) throw (ActiveMQException);
+ int readStompBodyLine( char* buf, const int bufLen ) throw (ActiveMQException);
+ void readStompCommand( StompFrame& frame ) throw (ActiveMQException);
+ void readStompHeaders( StompFrame& frame ) throw (ActiveMQException);
+ void readStompBody( StompFrame& frame ) throw (ActiveMQException);
+
+ private:
+
+ // The streams.
+ io::InputStream* istream;
+ io::OutputStream* ostream;
+
+ // The stomp frame for reads.
+ StompFrame frame;
+
+ // Make a 1-Meg buffer so we should never run out of space with a single frame.
+ static const int readBufferSize = 1000000;
+ char readBuffer[readBufferSize];
+ int readBufferPos;
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPIO_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompInputStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompInputStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompInputStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompInputStream.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPINPUTSTREAM_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPINPUTSTREAM_H_
+
+#include <activemq/io/InputStream.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ // Forward declarations.
+ class StompFrame;
+
+ /**
+ * Stream that can read stomp frames.
+ * @author Nathan Mittler
+ */
+ class StompInputStream : public io::InputStream{
+ public:
+
+ virtual ~StompInputStream(){};
+
+ virtual StompFrame* readStompFrame() throw (ActiveMQException) = 0;
+ };
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPINPUTSTREAM_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompInputStream.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompInputStream.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessage.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPMESSAGE_H_
+
+#include <cms/Message.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * Interface for all stomp messages received from the
+ * broker.
+ * @author Nathan Mittler
+ */
+ class StompMessage{
+ public:
+
+ enum MessageType{
+ MSG_CONNECT,
+ MSG_CONNECTED,
+ MSG_DISCONNECT,
+ MSG_SUBSCRIBE,
+ MSG_UNSUBSCRIBE,
+ MSG_TEXT,
+ MSG_BYTES,
+ MSG_BEGIN,
+ MSG_COMMIT,
+ MSG_ABORT,
+ MSG_ACK,
+ MSG_ERROR,
+ NUM_MSG_TYPES
+ };
+
+ public:
+
+ virtual ~StompMessage(){}
+
+ virtual MessageType getMessageType() const = 0;
+ virtual const cms::Message* getCMSMessage() const = 0;
+ virtual cms::Message* getCMSMessage() = 0;
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPMESSAGE_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessage.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessage.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessageListener.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessageListener.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessageListener.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessageListener.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_SERVERMESSAGELISTENER_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_SERVERMESSAGELISTENER_H_
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ class StompMessage;
+
+ /**
+ * Observer of stomp messages.
+ */
+ class StompMessageListener{
+ public:
+ virtual ~StompMessageListener(){}
+
+ virtual void onStompMessage( const StompMessage* msg ) = 0;
+ };
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_SERVERMESSAGELISTENER_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessageListener.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessageListener.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompOutputStream.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompOutputStream.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompOutputStream.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompOutputStream.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPOUTPUTSTREAM_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPOUTPUTSTREAM_H_
+
+#include <activemq/io/OutputStream.h>
+#include <activemq/transport/stomp/StompFrame.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * An output stream for stomp frames.
+ * @author Nathan Mittler
+ */
+ class StompOutputStream : public io::OutputStream{
+ public:
+
+ virtual ~StompOutputStream(){}
+
+ virtual void writeStompFrame( StompFrame& frame ) throw( ActiveMQException ) = 0;
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPOUTPUTSTREAM_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompOutputStream.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompOutputStream.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTextMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTextMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTextMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTextMessage.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,150 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPTEXTMESSAGE_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPTEXTMESSAGE_H_
+
+#include <activemq/transport/stomp/DestinationMessage.h>
+#include <activemq/transport/stomp/TransactionMessage.h>
+#include <cms/TextMessage.h>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * A stomp text message implementation
+ * @author Nathan Mittler
+ */
+ class StompTextMessage
+ :
+ public DestinationMessage,
+ public TransactionMessage,
+ public cms::TextMessage
+ {
+ public:
+
+ StompTextMessage(){
+ transactionId = NULL;
+ own = false;
+ message = NULL;
+ }
+
+ virtual ~StompTextMessage(){
+ if( transactionId != NULL ){
+ delete transactionId;
+ }
+
+ clear();
+ };
+
+ virtual MessageType getMessageType() const{
+ return MSG_TEXT;
+ }
+ virtual const cms::Message* getCMSMessage() const{
+ return this;
+ }
+ virtual cms::Message* getCMSMessage(){
+ return this;
+ }
+
+ virtual void setDestination( const char* destination ){
+ this->destination = destination;
+ }
+
+ virtual const char* getDestination() const{
+ return destination.c_str();
+ }
+
+ virtual bool isTransaction() const{
+ return transactionId != NULL;
+ }
+
+ virtual const char* getTransactionId() const{
+ if( isTransaction() ){
+ return transactionId->c_str();
+ }
+ return NULL;
+ }
+
+ virtual void setTransactionId( const char* id ){
+ if( transactionId != NULL ){
+ delete transactionId;
+ transactionId = NULL;
+ }
+
+ transactionId = new std::string( id );
+ }
+
+ virtual const char* getText() const throw( cms::CMSException ){
+ return message;
+ }
+
+ virtual void setText( const char* msg ) throw( cms::CMSException ){
+
+ clear();
+
+ int len = strlen( msg );
+
+ own = true;
+ message = new char[len+1];
+ memcpy( message, msg, len + 1 );
+ }
+
+ virtual void setTextNoCopy( const char* msg ) throw( cms::CMSException ){
+
+ clear();
+
+ own = false;
+ message = const_cast<char*>(msg);
+ }
+
+ virtual void acknowledge() throw( cms::CMSException ){
+ }
+
+ virtual cms::Message* clone() const{
+ StompTextMessage* msg = new StompTextMessage();
+ msg->destination = destination;
+ if( transactionId != NULL ){
+ msg->setTransactionId( transactionId->c_str() );
+ }
+ msg->message = message;
+ return msg->getCMSMessage();
+ }
+
+ private:
+
+ void clear(){
+
+ if( message != NULL && own ){
+ delete [] message;
+ }
+ message = NULL;
+ }
+
+ private:
+
+ std::string destination;
+ std::string* transactionId;
+ //std::string message;
+ char* message;
+ bool own;
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPTEXTMESSAGE_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTextMessage.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTextMessage.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.cpp Wed Mar 1 06:27:46 2006
@@ -0,0 +1,482 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StompTransport.h"
+#include "ConnectMessage.h"
+#include "DisconnectMessage.h"
+#include "ErrorMessage.h"
+#include "SubscribeMessage.h"
+#include "UnsubscribeMessage.h"
+#include "StompTextMessage.h"
+#include "StompBytesMessage.h"
+#include "StompFrame.h"
+
+#include <activemq/ActiveMQTopic.h>
+#include <activemq/concurrent/Lock.h>
+#include <activemq/transport/TopicListener.h>
+
+#include <time.h>
+#include <stdio.h>
+
+using namespace activemq;
+using namespace activemq::transport::stomp;
+using namespace activemq::io;
+using namespace activemq::concurrent;
+using namespace cms;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+StompTransport::StompTransport( const char* host,
+ const int port,
+ const char* userName,
+ const char* password )
+{
+ this->host = host;
+ this->port = port;
+ started = false;
+ readerThread = 0;
+ stompIO = NULL;
+ bufferedInputStream = NULL;
+ bufferedOutputStream = NULL;
+ killThread = false;
+ exceptionListener = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompTransport::~StompTransport()
+{
+ // Disconnect from the broker
+ close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::start() throw( CMSException ){
+
+ try{
+
+ if( socket.isConnected() ){
+ started = true;
+ return;
+ }
+
+ // Initialize in the stopped state - when the thread starts, it
+ // will put us in the started state.
+ started = false;
+
+ // Create the new connection
+ socket.connect( host.c_str(), port );
+ socket.setSoReceiveTimeout( 10000 );
+ socket.setSoLinger( 0 );
+ socket.setKeepAlive( true );
+
+ // Create the streams and wire them up.
+ bufferedInputStream = new BufferedInputStream( socket.getInputStream(), 100000 );
+ bufferedOutputStream = new BufferedOutputStream( socket.getOutputStream(), 100000 );
+ stompIO = new StompIO( bufferedInputStream, bufferedOutputStream );
+
+ // Send the connect request.
+ ConnectMessage msg;
+ msg.setLogin( userName );
+ msg.setPassword( password );
+ sendMessage( &msg );
+
+ // Sleep for 10 milliseconds for the broker to process the
+ // connect message.
+ milliSleep( 10 );
+
+ // Get the response.
+ StompMessage* response = readNextMessage();
+ if( response == NULL || response->getMessageType() != StompMessage::MSG_CONNECTED ){
+
+ // Disconnect to clean up any resources.
+ close();
+ throw ActiveMQException( "stomp::StompTransport::connect - error reading connect response from broker" );
+ }
+
+ // Start the reader thread.
+ killThread = false;
+ pthread_create( &readerThread, NULL, runCallback, this );
+
+ // Wait for the started flag to be set.
+ while( !started ){
+ milliSleep(10);
+ }
+
+ }catch( ActiveMQException& ex ){
+
+ close();
+ notify( ex );
+ throw ex;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::milliSleep( const long millis ){
+
+ timespec sleepTime;
+ sleepTime.tv_sec = millis/1000;
+ sleepTime.tv_nsec = (millis - (sleepTime.tv_sec * 1000)) * 1000000;
+
+ // Loop until the full time has elapsed.
+ nanosleep( &sleepTime, &sleepTime );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::close() throw( CMSException ){
+
+ ActiveMQException* ex = NULL;
+
+ // Wait for the reader thread to die.
+ if( readerThread != 0 ){
+ killThread = true;
+ pthread_join( readerThread, NULL );
+ readerThread = 0;
+ }
+
+ // Send the disconnect message.
+ if( socket.isConnected() ){
+
+ DisconnectMessage msg;
+ sendMessage( &msg );
+ }
+
+ // Close the socket.
+ try
+ {
+ closeSocket();
+ }
+ catch( ActiveMQException* x ){ ex = x; }
+
+ if( ex != NULL ){
+ notify( *ex );
+ throw *ex;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::closeSocket(){
+
+ // Send the disconnect message and destroy the connection.
+ if( socket.isConnected() ){
+
+ // We have to close the streams first because they
+ // depend on the socket streams.
+ if( stompIO != NULL ){
+ stompIO->close();
+ }
+
+ // Now we can close the socket.
+ socket.close();
+ }
+
+ // Destroy the streams.
+ if( stompIO != NULL ){
+ delete stompIO;
+ stompIO = NULL;
+ }
+
+ if( bufferedInputStream != NULL ){
+
+ delete bufferedInputStream;
+ bufferedInputStream = NULL;
+ }
+
+ if( bufferedOutputStream != NULL ){
+ delete bufferedOutputStream;
+ bufferedOutputStream = NULL;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::stop() throw( CMSException ){
+
+ // Lock this class
+ //Lock lock( &mutex );
+
+ started = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::notify( const ActiveMQException& ex ){
+
+ try{
+
+ if( exceptionListener != NULL ){
+ exceptionListener->onException( &ex );
+ }
+
+ }catch( ... ){
+ printf( "StompTransport::notif(ActiveMQException&) - caught exception notifying listener\n" );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::sendMessage( const cms::Topic* topic, const cms::Message* message ){
+
+ const cms::TextMessage* textMsg = dynamic_cast<const cms::TextMessage*>(message);
+ if( textMsg != NULL ){
+
+ StompTextMessage stompMsg;
+ stompMsg.setDestination( createDestinationName( topic ).c_str() );
+ stompMsg.setTextNoCopy( textMsg->getText() );
+ sendMessage( dynamic_cast<const DestinationMessage*>(&stompMsg) );
+ return;
+ }
+
+ const cms::BytesMessage* bytesMsg = dynamic_cast<const cms::BytesMessage*>(message);
+ if( bytesMsg != NULL ){
+
+ StompBytesMessage stompMsg;
+ stompMsg.setDestination( createDestinationName( topic ).c_str() );
+ stompMsg.setDataNoCopy( bytesMsg->getData(), bytesMsg->getNumBytes() );
+ sendMessage( dynamic_cast<const DestinationMessage*>(&stompMsg) );
+ return;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::sendMessage( const StompMessage* msg ){
+
+
+ StompFrame* frame = NULL;
+
+ try{
+
+ // Adapt the message to a stomp frame object.
+ frame = protocolAdapter.adapt( msg );
+
+ // If the adaptation failed - throw an exception.
+ if( frame == NULL ){
+ throw ActiveMQException("Unable to adapt message type to stomp frame");
+ }
+
+ // Lock this class
+ Lock lock( &mutex );
+
+ // Write the data to the socket.
+ if( stompIO != NULL ){
+ stompIO->writeStompFrame( *frame );
+ }
+
+ }catch( ActiveMQException& ex ){
+
+ // Destroy the frame
+ delete frame;
+
+ // Notify observers of the exception.
+ notify( ex );
+
+ // Rethrow the exception.
+ throw ex;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompMessage* StompTransport::readNextMessage(){
+
+ try{
+
+ // Lock access to the network layer.
+ Lock lock( &mutex );
+
+ // Create a temporary stomp frame.
+ StompFrame* frame = stompIO->readStompFrame();
+
+ // Adapt the stomp frame to a message.
+ StompMessage* msg = protocolAdapter.adapt( frame );
+
+ // If the adaptation failed - throw an exception.
+ if( msg == NULL ){
+ throw ActiveMQException( "unable to adapt frame to message type" );
+ }
+
+ // return the message.
+ return msg;
+
+ }catch( ActiveMQException& ex ){
+
+ printf( "%s\n", ex.getMessage() );
+
+ // Notify observers of the exception.
+ notify( ex );
+
+ // Rethrow the exception.
+ throw ex;
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::addMessageListener( const Topic* topic,
+ MessageListener* listener ){
+
+ Lock lock( &mutex );
+
+ // Create the destination string.
+ std::string destination = createDestinationName( topic );
+
+ // Determine whether or not we're already subscribed to this topic
+ bool subscribed = destinationPool.hasListeners( destination );
+
+ // Add the listener to the topic.
+ destinationPool.addListener( destination, listener );
+
+ // If this is the first listener on this destination, subscribe.
+ if( !subscribed ){
+ subscribe( destination );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::removeMessageListener( const Topic* topic,
+ MessageListener* listener ){
+
+ Lock lock( &mutex );
+
+ // Create the destination string.
+ std::string destination = createDestinationName( topic );
+
+ // Remove this listener from the topic.
+ destinationPool.removeListener( destination, listener );
+
+ // If there are no longer any listeners of this destination,
+ // unsubscribe.
+ if( !destinationPool.hasListeners( destination ) ){
+ unsubscribe( destination );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::subscribe( const std::string& destination ){
+
+ // Create and initialize a subscribe message.
+ SubscribeMessage msg;
+ msg.setDestination( destination.c_str() );
+ msg.setAckMode( Session::AUTO_ACKNOWLEDGE );
+
+ // Send the message.
+ sendMessage( &msg );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::unsubscribe( const std::string& destination ){
+
+ // Create and initialize an unsubscribe message.
+ UnsubscribeMessage msg;
+ msg.setDestination( destination.c_str() );
+
+ // Send the message.
+ sendMessage( &msg );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransport::run(){
+
+ // Set the data flow in the started state.
+ started = true;
+
+ // Loop until the thread is told to die.
+ while( !killThread && socket.isConnected() ){
+
+ StompMessage* msg = NULL;
+
+ try{
+
+ if( stompIO->available() == 0 ){
+
+ // No data is available - wait for a short time
+ // and try again.
+ milliSleep( 10 );
+ continue;
+ }
+
+ // There is data on the socket - read it.
+ msg = readNextMessage();
+
+ // If we got a message, notify listeners.
+ if( msg != NULL && started ){
+
+ switch( msg->getMessageType() ){
+ case StompMessage::MSG_ERROR:{
+
+ // This is an error frame - relay the error to the
+ // ExceptionListener.
+ const ErrorMessage* errorMessage = dynamic_cast<const ErrorMessage*>(msg);
+ string errStr = errorMessage->getErrorTitle();
+ errStr += ". ";
+ errStr += errorMessage->getErrorText();
+ ActiveMQException ex( errStr.c_str() );
+ notify( ex );
+ break;
+ }
+ case StompMessage::MSG_TEXT:
+ case StompMessage::MSG_BYTES:{
+
+ // Notify listeners of the destination message.
+ const DestinationMessage* destMsg = dynamic_cast<const DestinationMessage*>( msg );
+
+ // Notify observers.
+ destinationPool.notify( destMsg->getDestination(), destMsg->getCMSMessage() );
+ break;
+ }
+ default:{
+ break;
+ }
+ }
+ }
+
+ }catch( ActiveMQException& ex ){
+
+ // Close the socket.
+ try{
+ closeSocket();
+ }catch( ... ){
+ printf("run - caught exception closing socket\n" );
+ }
+
+ // Notify observers of the exception.
+ notify( ActiveMQException( (string)("stomp::StompTransport::run - caught exception\n\t") + ex.getMessage() ) );
+
+
+ }catch( ... ){
+
+ // Close the socket.
+ try{
+ closeSocket();
+ }catch( ... ){
+ printf("run - caught exception closing socket\n" );
+ }
+
+ // Notify observers of the exception.
+ notify( ActiveMQException( "stomp::StompTransport::run - unknown error reading message" ) );
+ }
+
+ // If a message was allocated in this iteration of the loop,
+ // delete it.
+ if( msg != NULL ){
+ delete msg;
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void* StompTransport::runCallback( void* instance ){
+
+ ((StompTransport*)instance)->run();
+ return NULL;
+}
+
+
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.cpp
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,200 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPTRANSPORT_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPTRANSPORT_H_
+
+#include <activemq/transport/Transport.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/transport/stomp/AggregateProtocolAdapter.h>
+#include <activemq/transport/stomp/StompFrame.h>
+#include <activemq/transport/stomp/StompIO.h>
+#include <activemq/transport/stomp/DestinationPool.h>
+#include <activemq/io/BufferedInputStream.h>
+#include <activemq/io/BufferedOutputStream.h>
+#include <activemq/io/Socket.h>
+#include <pthread.h>
+#include <vector>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ // Forward declarations.
+ class TransportListener;
+
+ /**
+ * Implementation of the transport interface
+ * for the stomp protocol.
+ * @author Nathan Mittler
+ */
+ class StompTransport : public Transport
+ {
+ public:
+
+ StompTransport( const char* host,
+ const int port,
+ const char* userName,
+ const char* password );
+ virtual ~StompTransport();
+
+ /**
+ * Disconnects from the broker.
+ */
+ virtual void close() throw (cms::CMSException);
+
+ /**
+ * Connects if necessary and starts the flow of messages to observers.
+ */
+ virtual void start() throw( cms::CMSException );
+
+ /**
+ * Stops the flow of messages to observers. Messages
+ * will not be saved, so messages arriving after this call
+ * will be lost.
+ */
+ virtual void stop() throw( cms::CMSException );
+
+ /**
+ * Sends a message to the broker on the given topic.
+ * @param topic The topic on which to send the message.
+ * @param message The message to send.
+ */
+ virtual void sendMessage( const cms::Topic* topic, const cms::Message* message );
+
+ virtual void addMessageListener( const cms::Topic* topic,
+ cms::MessageListener* listener );
+ virtual void removeMessageListener( const cms::Topic* topic,
+ cms::MessageListener* listener );
+
+ /**
+ * Sets the listener of transport exceptions.
+ */
+ virtual void setExceptionListener( cms::ExceptionListener* listener ){
+ exceptionListener = listener;
+ }
+
+ private:
+
+ void closeSocket();
+
+ std::string createDestinationName( const cms::Topic* topic ){
+ return ((std::string)"/topic/") + topic->getTopicName();
+ }
+
+ std::string createTopicName( const std::string& destination ){
+ std::string topicName = destination.substr( 7 );
+ return topicName;
+ }
+
+ void subscribe( const std::string& destination );
+ void unsubscribe( const std::string& destination );
+ void milliSleep( const long millis );
+ void sendMessage( const StompMessage* msg );
+ void notify( const ActiveMQException& ex );
+
+ StompMessage* readNextMessage();
+
+ void run();
+
+ /**
+ * The run method for the reader thread.
+ */
+ static void* runCallback( void* );
+
+ private:
+
+ /**
+ * Pool of STOMP destinations and the subscribers to those
+ * destinations.
+ */
+ DestinationPool destinationPool;
+
+ /**
+ * Listener to this transport channel.
+ */
+ TransportListener* listener;
+
+ /**
+ * The client socket.
+ */
+ io::Socket socket;
+
+ /**
+ * Indicates whether or not the flow of data to listeners is
+ * started.
+ */
+ bool started;
+
+ /**
+ * Flag to control the alive state of the IO thread.
+ */
+ bool killThread;
+
+ /**
+ * The broker host name.
+ */
+ std::string host;
+
+ /**
+ * The broker port.
+ */
+ int port;
+
+ std::string userName;
+ std::string password;
+
+ /**
+ * Synchronization object.
+ */
+ concurrent::Mutex mutex;
+
+ /**
+ * The reader thread.
+ */
+ pthread_t readerThread;
+
+ /**
+ * Protocol adapter for going between messages
+ * and stomp frames.
+ */
+ AggregateProtocolAdapter protocolAdapter;
+
+ /**
+ * IO for stomp messages.
+ */
+ StompIO* stompIO;
+
+ /**
+ * Buffers input from the socket stream.
+ */
+ io::BufferedInputStream* bufferedInputStream;
+
+ /**
+ * Buffers output to the socket stream.
+ */
+ io::BufferedOutputStream* bufferedOutputStream;
+
+ /**
+ * Listener to exceptions.
+ */
+ cms::ExceptionListener* exceptionListener;
+ };
+
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPTRANSPORT_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.cpp Wed Mar 1 06:27:46 2006
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "StompTransportFactory.h"
+#include "StompTransport.h"
+#include <sstream>
+
+using namespace activemq::transport;
+using namespace activemq::transport::stomp;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* StompTransportFactory::createTransport( const char* brokerUrl ){
+
+ brokerHost = "127.0.0.1";
+ brokerPort = 61626;
+ return new StompTransport( brokerHost.c_str(), brokerPort, "", "" );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Transport* StompTransportFactory::createTransport( const char* brokerUrl,
+ const char* userName,
+ const char* password )
+{
+ this->userName = userName;
+ this->password = password;
+
+ parseUrl( brokerUrl );
+ return new StompTransport( brokerHost.c_str(),
+ brokerPort,
+ userName,
+ password );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompTransportFactory::parseUrl( const char* brokerUrl )
+{
+ string url = brokerUrl;
+ unsigned int portStartIx = url.rfind( ':' );
+ if( portStartIx == string::npos || portStartIx==(url.length()-1) ){
+ brokerPort = 61626;
+ }
+ else{
+
+ try{
+ stringstream stream( url.substr( portStartIx+1 ) );
+ stream >> brokerPort;
+ }catch( ... ){
+ brokerPort = 61626;
+ portStartIx = string::npos;
+ }
+ }
+
+ brokerHost = url.substr( 0, portStartIx );
+}
+
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.cpp
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.cpp
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL
Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.h Wed Mar 1 06:27:46 2006
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_TRANSPORT_STOMP_STOMPTRANSPORTFACTORY_H_
+#define ACTIVEMQ_TRANSPORT_STOMP_STOMPTRANSPORTFACTORY_H_
+
+#include <activemq/transport/Transport.h>
+#include <activemq/transport/TransportFactory.h>
+#include <string>
+
+namespace activemq{
+namespace transport{
+namespace stomp{
+
+ /**
+ * Manufactures transports for a particular protocol.
+ * @author Nathan Mittler
+ */
+ class StompTransportFactory : public TransportFactory{
+ public:
+
+ virtual ~StompTransportFactory(){}
+
+ /**
+ * Manufactures a transport object with a default login.
+ * @param brokerUrl The URL of the broker.
+ */
+ virtual Transport* createTransport( const char* brokerUrl );
+
+ /**
+ * Manufactures a transport object.
+ * @param brokerUrl The URL of the broker
+ * @param userName The login for the broker.
+ * @param password The password for the broker login.
+ */
+ virtual Transport* createTransport( const char* brokerUrl,
+ const char* userName,
+ const char* password );
+
+ private:
+
+ void parseUrl( const char* brokerUrl );
+
+ private:
+
+ std::string brokerHost;
+ int brokerPort;
+ std::string userName;
+ std::string password;
+ };
+}}}
+
+#endif /*ACTIVEMQ_TRANSPORT_STOMP_STOMPTRANSPORTFACTORY_H_*/
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.h
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.h
------------------------------------------------------------------------------
svn:keywords = Date Author Id Revision HeadURL